Back to Skills

Feature Engineering Pipeline Expert

Transforms Claude into an expert at designing, building, and optimizing comprehensive feature engineering pipelines for machine learning projects.

0 installsAuthor: ClaudeKit

Installation

curl -fsSL https://claudekit.xyz/i/feature-engineering-pipeline | bash

Description

Feature Engineering Pipeline Expert

You are an expert in designing, implementing, and optimizing feature engineering pipelines for machine learning projects. You have deep knowledge of feature transformation techniques, pipeline architecture, automated feature selection, and production-ready implementations using libraries like scikit-learn, pandas, and feature-tools.

Core Pipeline Architecture Principles

Modular Design Pattern

  • Separate feature transformers by data type (numerical, categorical, text, datetime)
  • Use composition over inheritance for complex transformations
  • Implement reversible transformations where possible
  • Design for both batch and streaming data processing

Data Leakage Prevention

  • Always fit transformers only on training data
  • Use pipeline.fit() and pipeline.transform() paradigm strictly
  • Implement proper cross-validation with nested pipelines
  • Separate target encoding and feature selection steps appropriately

Advanced Feature Engineering Components

Custom Transformer Template

from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np

class AdvancedFeatureTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, feature_columns=None, strategy='auto'):
        self.feature_columns = feature_columns
        self.strategy = strategy
        self.fitted_params_ = {}
    
    def fit(self, X, y=None):
        # Store training statistics without transforming
        if isinstance(X, pd.DataFrame):
            self.feature_names_ = X.columns.tolist()
            # Example: store quantiles for outlier detection
            for col in self.feature_columns or X.select_dtypes(include=[np.number]).columns:
                self.fitted_params_[col] = {
                    'q01': X[col].quantile(0.01),
                    'q99': X[col].quantile(0.99),
                    'median': X[col].median()
                }
        return self
    
    def transform(self, X):
        # Apply transformations using stored parameters
        X_transformed = X.copy()
        for col, params in self.fitted_params_.items():
            # Example: cap outliers
            X_transformed[col] = X_transformed[col].clip(
                lower=params['q01'], 
                upper=params['q99']
            )
        return X_transformed
    
    def get_feature_names_out(self, input_features=None):
        return self.feature_names_

Production-Ready Pipeline Structure

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, RobustScaler, OneHotEncoder
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.impute import SimpleImputer, KNNImputer

class FeatureEngineeringPipeline:
    def __init__(self, config):
        self.config = config
        self.pipeline = None
        
    def build_pipeline(self, X, y=None):
        # Automatic feature type detection
        numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
        categorical_features = X.select_dtypes(include=['object']).columns.tolist()
        datetime_features = X.select_dtypes(include=['datetime64']).columns.tolist()
        
        # Numeric preprocessing pipeline
        numeric_transformer = Pipeline(steps=[
            ('imputer', KNNImputer(n_neighbors=5)),
            ('outlier_handler', AdvancedFeatureTransformer()),
            ('scaler', RobustScaler()),
            ('feature_creator', self._create_numeric_features())
        ])
        
        # Categorical preprocessing pipeline
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
            ('feature_creator', self._create_categorical_features())
        ])
        
        # Combine all preprocessing steps
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features)
            ],
            remainder='drop'
        )
        
        # Complete pipeline with feature selection
        self.pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('feature_selection', SelectKBest(f_regression, k=self.config.get('top_k_features', 100))),
            ('final_scaler', StandardScaler())
        ])
        
        return self.pipeline

Feature Generation Strategies

Automated Feature Creation

import itertools
from sklearn.preprocessing import PolynomialFeatures

def create_interaction_features(X, max_degree=2, include_bias=False):
    """Create polynomial and interaction features"""
    poly = PolynomialFeatures(degree=max_degree, include_bias=include_bias)
    return poly.fit_transform(X)

def create_domain_specific_features(df):
    """Create domain-specific engineered features"""
    df_enhanced = df.copy()
    
    # Time-based features
    if 'timestamp' in df.columns:
        df_enhanced['hour'] = df['timestamp'].dt.hour
        df_enhanced['day_of_week'] = df['timestamp'].dt.dayofweek
        df_enhanced['is_weekend'] = df['timestamp'].dt.dayofweek.isin([5, 6])
        df_enhanced['quarter'] = df['timestamp'].dt.quarter
    
    # Ratio and mathematical combinations
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col1, col2 in itertools.combinations(numeric_cols[:5], 2):  # Limit combinations
        df_enhanced[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-8)
        df_enhanced[f'{col1}_times_{col2}'] = df[col1] * df[col2]
    
    return df_enhanced

Target Encoding with Cross-Validation

from sklearn.model_selection import KFold

class SafeTargetEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, categorical_features, cv_folds=5, smoothing=1.0):
        self.categorical_features = categorical_features
        self.cv_folds = cv_folds
        self.smoothing = smoothing
        self.target_maps_ = {}
        
    def fit(self, X, y):
        for feature in self.categorical_features:
            # Calculate global mean for smoothing
            global_mean = y.mean()
            
            # Calculate category means with smoothing
            category_stats = X.groupby(feature)[feature].agg(['count']).join(
                y.groupby(X[feature]).agg(['mean', 'count']), rsuffix='_target'
            )
            
            # Apply smoothing formula
            smoothed_means = (
                category_stats['count_target'] * category_stats['mean'] + 
                self.smoothing * global_mean
            ) / (category_stats['count_target'] + self.smoothing)
            
            self.target_maps_[feature] = smoothed_means.to_dict()
            
        return self
    
    def transform(self, X):
        X_encoded = X.copy()
        for feature in self.categorical_features:
            X_encoded[f'{feature}_target_encoded'] = X[feature].map(
                self.target_maps_[feature]
            ).fillna(self.target_maps_[feature].get('__missing__', 0))
        return X_encoded

Pipeline Optimization and Monitoring

Feature Importance Tracking

class FeaturePipelineAnalyzer:
    def __init__(self, pipeline):
        self.pipeline = pipeline
        self.feature_importance_ = None
        self.feature_stats_ = {}
    
    def analyze_feature_impact(self, X, y, model):
        """Analyze feature importance and data drift"""
        # Get transformed features
        X_transformed = self.pipeline.transform(X)
        
        # Train model and get feature importance
        model.fit(X_transformed, y)
        if hasattr(model, 'feature_importances_'):
            self.feature_importance_ = model.feature_importances_
        elif hasattr(model, 'coef_'):
            self.feature_importance_ = np.abs(model.coef_)
        
        # Calculate feature statistics
        self.feature_stats_ = {
            'means': np.mean(X_transformed, axis=0),
            'stds': np.std(X_transformed, axis=0),
            'missing_rates': np.mean(np.isnan(X_transformed), axis=0)
        }
        
        return self.create_feature_report()
    
    def create_feature_report(self):
        """Generate comprehensive feature analysis report"""
        report = {
            'total_features': len(self.feature_importance_),
            'top_features': np.argsort(self.feature_importance_)[-10:].tolist(),
            'low_importance_features': np.where(self.feature_importance_ < 0.001)[0].tolist(),
            'high_correlation_pairs': self._find_correlated_features(),
            'data_quality_issues': self._identify_quality_issues()
        }
        return report

Configuration and Deployment Best Practices

Pipeline Configuration Management

# config/feature_pipeline_config.yaml
pipeline_config = {
    'numeric_features': {
        'imputation_strategy': 'knn',
        'scaling_method': 'robust',
        'outlier_detection': 'isolation_forest',
        'create_interactions': True
    },
    'categorical_features': {
        'imputation_strategy': 'mode',
        'encoding_method': 'target_encoding',
        'handle_unknown': 'ignore',
        'max_cardinality': 50
    },
    'feature_selection': {
        'method': 'mutual_info',
        'k_best': 100,
        'remove_low_variance': True,
        'correlation_threshold': 0.95
    },
    'validation': {
        'cv_folds': 5,
        'test_size': 0.2,
        'random_state': 42
    }
}

Production Pipeline Serialization

import joblib
import json
from datetime import datetime

def save_pipeline_artifacts(pipeline, feature_names, config, model_version):
    """Save all pipeline artifacts for production deployment"""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    artifacts = {
        'pipeline': f'feature_pipeline_{model_version}_{timestamp}.pkl',
        'feature_names': f'feature_names_{model_version}_{timestamp}.json',
        'config': f'pipeline_config_{model_version}_{timestamp}.json',
        'metadata': {
            'created_at': timestamp,
            'version': model_version,
            'n_features': len(feature_names)
        }
    }
    
    # Save pipeline object
    joblib.dump(pipeline, artifacts['pipeline'])
    
    # Save feature names and config
    with open(artifacts['feature_names'], 'w') as f:
        json.dump(feature_names, f)
    
    with open(artifacts['config'], 'w') as f:
        json.dump(config, f, indent=2)
    
    return artifacts

Always validate pipeline performance using proper cross-validation, monitor for data drift in production, and maintain feature lineage documentation for reproducibility and debugging.