Skip to content

Model Architecture and Machine Learning Framework

Overview

The Cross-Asset Alpha Engine employs a sophisticated machine learning architecture that combines regime detection with adaptive alpha generation. The system uses multiple model types, each optimized for different aspects of market behavior and regime conditions.

Regime Detection Models

Hidden Markov Model (HMM) Implementation

Mathematical Foundation

The HMM assumes markets exist in K unobservable states (regimes) with transition dynamics:

from hmmlearn import hmm
import numpy as np
from sklearn.preprocessing import StandardScaler

class RegimeHMM:
    """Advanced Hidden Markov Model for regime detection."""

    def __init__(self, n_components=3, covariance_type="full", n_iter=100):
        self.n_components = n_components
        self.model = hmm.GaussianHMM(
            n_components=n_components,
            covariance_type=covariance_type,
            n_iter=n_iter,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.regime_labels = {
            0: "Low Volatility",
            1: "High Volatility", 
            2: "Transition"
        }

    def fit(self, features):
        """Fit HMM to regime detection features."""
        # Standardize features for numerical stability
        features_scaled = self.scaler.fit_transform(features)

        # Fit HMM with multiple random initializations
        best_score = -np.inf
        best_model = None

        for seed in range(5):
            temp_model = hmm.GaussianHMM(
                n_components=self.n_components,
                covariance_type=self.model.covariance_type,
                n_iter=self.model.n_iter,
                random_state=seed
            )

            try:
                temp_model.fit(features_scaled)
                score = temp_model.score(features_scaled)

                if score > best_score:
                    best_score = score
                    best_model = temp_model
            except:
                continue

        self.model = best_model if best_model else self.model
        return self

    def predict_regimes(self, features):
        """Predict most likely regime sequence using Viterbi algorithm."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict(features_scaled)

    def predict_proba(self, features):
        """Predict regime probabilities using forward-backward algorithm."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict_proba(features_scaled)

    def get_regime_characteristics(self, features, regimes):
        """Analyze characteristics of each regime."""
        characteristics = {}

        for regime in range(self.n_components):
            regime_mask = regimes == regime
            regime_data = features[regime_mask]

            characteristics[self.regime_labels[regime]] = {
                'observations': len(regime_data),
                'percentage': len(regime_data) / len(features) * 100,
                'mean_volatility': regime_data['volatility_20d'].mean() if 'volatility_20d' in regime_data.columns else None,
                'mean_vix': regime_data['vix_level'].mean() if 'vix_level' in regime_data.columns else None,
                'mean_correlation': regime_data['equity_bond_corr'].mean() if 'equity_bond_corr' in regime_data.columns else None
            }

        return characteristics

Regime Feature Selection

Key features for regime detection:

def prepare_regime_features(data):
    """Prepare features specifically for regime detection."""

    regime_features = pd.DataFrame(index=data.index)

    # Volatility measures
    regime_features['volatility_20d'] = data.groupby('symbol')['returns_1d'].rolling(20).std().reset_index(0, drop=True) * np.sqrt(252)
    regime_features['vol_ratio_5_20'] = (
        data.groupby('symbol')['returns_1d'].rolling(5).std().reset_index(0, drop=True) /
        data.groupby('symbol')['returns_1d'].rolling(20).std().reset_index(0, drop=True)
    )

    # Cross-asset correlations
    equity_returns = data[data['symbol'].isin(['SPY', 'QQQ'])]['returns_1d']
    bond_returns = data[data['symbol'] == 'TLT']['returns_1d']

    if len(equity_returns) > 0 and len(bond_returns) > 0:
        regime_features['equity_bond_corr'] = equity_returns.rolling(20).corr(bond_returns)

    # VIX level (if available)
    vix_data = data[data['symbol'] == 'VIX']
    if len(vix_data) > 0:
        regime_features['vix_level'] = vix_data['close']
        regime_features['vix_change'] = vix_data['returns_1d']

    # Volume patterns
    regime_features['volume_zscore'] = data.groupby('symbol').apply(
        lambda x: (x['volume'] - x['volume'].rolling(20).mean()) / x['volume'].rolling(20).std()
    ).reset_index(0, drop=True)

    return regime_features.dropna()

Statistical Regime Detection

Threshold-Based Models

class ThresholdRegimeDetector:
    """Statistical threshold-based regime detection."""

    def __init__(self, volatility_thresholds=[0.15, 0.25], vix_thresholds=[20, 30]):
        self.vol_thresholds = volatility_thresholds
        self.vix_thresholds = vix_thresholds

    def detect_regimes(self, features):
        """Detect regimes using statistical thresholds."""
        regimes = np.zeros(len(features))

        # Volatility-based regime detection
        vol_regimes = pd.cut(
            features['volatility_20d'], 
            bins=[0] + self.vol_thresholds + [np.inf],
            labels=[0, 1, 2]
        ).astype(int)

        # VIX-based regime detection (if available)
        if 'vix_level' in features.columns:
            vix_regimes = pd.cut(
                features['vix_level'],
                bins=[0] + self.vix_thresholds + [np.inf],
                labels=[0, 1, 2]
            ).astype(int)

            # Combined regime (average of indicators)
            regimes = np.round((vol_regimes + vix_regimes) / 2).astype(int)
        else:
            regimes = vol_regimes

        return regimes

Alpha Generation Models

Random Forest Architecture

Core Implementation

from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import TimeSeriesSplit
import shap

class AlphaModel:
    """Regime-aware Random Forest alpha generation model."""

    def __init__(self, model_config=None):
        self.config = model_config or {
            'n_estimators': 100,
            'max_depth': 15,
            'min_samples_split': 10,
            'min_samples_leaf': 5,
            'max_features': 'sqrt',
            'random_state': 42,
            'n_jobs': -1
        }

        self.models = {}
        self.feature_importance = {}
        self.shap_explainers = {}

    def fit(self, features, targets, regimes=None, regime_probs=None):
        """Train regime-specific and overall models."""

        # Overall model (baseline)
        self.models['overall'] = RandomForestRegressor(**self.config)
        self.models['overall'].fit(features, targets)

        # Store overall feature importance
        self.feature_importance['overall'] = pd.DataFrame({
            'feature': features.columns,
            'importance': self.models['overall'].feature_importances_
        }).sort_values('importance', ascending=False)

        # Regime-specific models
        if regimes is not None:
            for regime in np.unique(regimes):
                regime_mask = regimes == regime

                # Require minimum samples for regime-specific model
                if regime_mask.sum() > 100:
                    regime_features = features[regime_mask]
                    regime_targets = targets[regime_mask]

                    # Train regime-specific model
                    model_key = f'regime_{regime}'
                    self.models[model_key] = RandomForestRegressor(**self.config)
                    self.models[model_key].fit(regime_features, regime_targets)

                    # Store feature importance
                    self.feature_importance[model_key] = pd.DataFrame({
                        'feature': features.columns,
                        'importance': self.models[model_key].feature_importances_
                    }).sort_values('importance', ascending=False)

        # Initialize SHAP explainers
        self._initialize_shap_explainers(features.sample(min(1000, len(features))))

        return self

    def predict(self, features, regimes=None, regime_probs=None):
        """Generate alpha predictions with regime awareness."""

        if regime_probs is not None and regime_probs.shape[1] > 1:
            # Ensemble prediction weighted by regime probabilities
            predictions = np.zeros(len(features))

            for regime in range(regime_probs.shape[1]):
                model_key = f'regime_{regime}'
                if model_key in self.models:
                    regime_preds = self.models[model_key].predict(features)
                    predictions += regime_probs[:, regime] * regime_preds
                else:
                    # Fallback to overall model
                    regime_preds = self.models['overall'].predict(features)
                    predictions += regime_probs[:, regime] * regime_preds

            return predictions

        elif regimes is not None:
            # Use most likely regime for each prediction
            predictions = np.zeros(len(features))

            for i, regime in enumerate(regimes):
                model_key = f'regime_{regime}'
                if model_key in self.models:
                    predictions[i] = self.models[model_key].predict(features.iloc[[i]])[0]
                else:
                    predictions[i] = self.models['overall'].predict(features.iloc[[i]])[0]

            return predictions

        else:
            # Use overall model
            return self.models['overall'].predict(features)

    def _initialize_shap_explainers(self, sample_features):
        """Initialize SHAP explainers for model interpretability."""
        for model_name, model in self.models.items():
            try:
                self.shap_explainers[model_name] = shap.TreeExplainer(model)
            except:
                pass  # Skip if SHAP fails

    def explain_predictions(self, features, model_name='overall'):
        """Generate SHAP explanations for predictions."""
        if model_name in self.shap_explainers:
            explainer = self.shap_explainers[model_name]
            shap_values = explainer.shap_values(features)
            return shap_values
        else:
            return None

    def get_feature_importance(self, model_name='overall', top_n=20):
        """Get top feature importance for specified model."""
        if model_name in self.feature_importance:
            return self.feature_importance[model_name].head(top_n)
        else:
            return None

Model Validation and Selection

class ModelValidator:
    """Comprehensive model validation for time series data."""

    def __init__(self, n_splits=5, test_size=63):  # ~3 months test
        self.n_splits = n_splits
        self.test_size = test_size

    def walk_forward_validation(self, features, targets, model_class, model_config):
        """Perform walk-forward validation."""

        tscv = TimeSeriesSplit(n_splits=self.n_splits, test_size=self.test_size)

        validation_results = {
            'train_scores': [],
            'test_scores': [],
            'predictions': [],
            'feature_importance': []
        }

        for fold, (train_idx, test_idx) in enumerate(tscv.split(features)):
            # Split data
            X_train, X_test = features.iloc[train_idx], features.iloc[test_idx]
            y_train, y_test = targets.iloc[train_idx], targets.iloc[test_idx]

            # Train model
            model = model_class(**model_config)
            model.fit(X_train, y_train)

            # Evaluate
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)

            # Predictions
            test_predictions = model.predict(X_test)

            # Store results
            validation_results['train_scores'].append(train_score)
            validation_results['test_scores'].append(test_score)
            validation_results['predictions'].append({
                'fold': fold,
                'actual': y_test.values,
                'predicted': test_predictions,
                'dates': X_test.index
            })

            # Feature importance (if available)
            if hasattr(model, 'feature_importances_'):
                importance_df = pd.DataFrame({
                    'feature': features.columns,
                    'importance': model.feature_importances_,
                    'fold': fold
                })
                validation_results['feature_importance'].append(importance_df)

        return validation_results

    def calculate_validation_metrics(self, validation_results):
        """Calculate comprehensive validation metrics."""

        # Aggregate predictions
        all_actual = []
        all_predicted = []

        for pred_result in validation_results['predictions']:
            all_actual.extend(pred_result['actual'])
            all_predicted.extend(pred_result['predicted'])

        all_actual = np.array(all_actual)
        all_predicted = np.array(all_predicted)

        # Calculate metrics
        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

        metrics = {
            'mean_train_score': np.mean(validation_results['train_scores']),
            'std_train_score': np.std(validation_results['train_scores']),
            'mean_test_score': np.mean(validation_results['test_scores']),
            'std_test_score': np.std(validation_results['test_scores']),
            'mse': mean_squared_error(all_actual, all_predicted),
            'mae': mean_absolute_error(all_actual, all_predicted),
            'r2': r2_score(all_actual, all_predicted),
            'information_coefficient': np.corrcoef(all_actual, all_predicted)[0, 1]
        }

        return metrics

Alternative Model Architectures

Logistic Regression (Interpretable Model)

from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

class InterpretableAlphaModel:
    """Logistic regression model for interpretable alpha generation."""

    def __init__(self, penalty='l1', C=0.1, solver='liblinear'):
        self.model = LogisticRegression(
            penalty=penalty,
            C=C,
            solver=solver,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.feature_coefficients = None

    def fit(self, features, targets):
        """Fit logistic regression model."""
        # Convert regression targets to classification (directional prediction)
        binary_targets = (targets > targets.median()).astype(int)

        # Scale features
        features_scaled = self.scaler.fit_transform(features)

        # Fit model
        self.model.fit(features_scaled, binary_targets)

        # Store coefficients for interpretation
        self.feature_coefficients = pd.DataFrame({
            'feature': features.columns,
            'coefficient': self.model.coef_[0],
            'abs_coefficient': np.abs(self.model.coef_[0])
        }).sort_values('abs_coefficient', ascending=False)

        return self

    def predict_proba(self, features):
        """Predict probability of positive return."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict_proba(features_scaled)[:, 1]

    def get_feature_interpretation(self):
        """Get interpretable feature coefficients."""
        return self.feature_coefficients

Support Vector Machine (Non-Linear Patterns)

from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV

class NonLinearAlphaModel:
    """Support Vector Machine for non-linear pattern recognition."""

    def __init__(self, kernel='rbf', param_grid=None):
        self.kernel = kernel
        self.param_grid = param_grid or {
            'C': [0.1, 1, 10],
            'gamma': ['scale', 'auto', 0.001, 0.01],
            'epsilon': [0.01, 0.1, 0.2]
        }
        self.model = None
        self.scaler = StandardScaler()

    def fit(self, features, targets):
        """Fit SVM with hyperparameter optimization."""
        # Scale features
        features_scaled = self.scaler.fit_transform(features)

        # Grid search for optimal parameters
        svm = SVR(kernel=self.kernel)
        grid_search = GridSearchCV(
            svm, 
            self.param_grid, 
            cv=3, 
            scoring='neg_mean_squared_error',
            n_jobs=-1
        )

        grid_search.fit(features_scaled, targets)

        # Store best model
        self.model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_

        return self

    def predict(self, features):
        """Generate predictions using fitted SVM."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict(features_scaled)

Model Ensemble and Meta-Learning

Ensemble Architecture

class AlphaEnsemble:
    """Ensemble of multiple alpha models with dynamic weighting."""

    def __init__(self, models_config):
        self.models = {}
        self.model_weights = {}
        self.performance_history = {}

        # Initialize individual models
        for name, config in models_config.items():
            if config['type'] == 'random_forest':
                self.models[name] = AlphaModel(config['params'])
            elif config['type'] == 'logistic':
                self.models[name] = InterpretableAlphaModel(**config['params'])
            elif config['type'] == 'svm':
                self.models[name] = NonLinearAlphaModel(**config['params'])

    def fit(self, features, targets, regimes=None):
        """Train all models in the ensemble."""

        for name, model in self.models.items():
            try:
                if hasattr(model, 'fit'):
                    if 'regime' in name.lower() and regimes is not None:
                        model.fit(features, targets, regimes)
                    else:
                        model.fit(features, targets)

                # Initialize equal weights
                self.model_weights[name] = 1.0 / len(self.models)

            except Exception as e:
                print(f"Failed to train model {name}: {e}")
                self.model_weights[name] = 0.0

        return self

    def predict(self, features, regimes=None, regime_probs=None):
        """Generate ensemble predictions with dynamic weighting."""

        predictions = {}

        # Get predictions from each model
        for name, model in self.models.items():
            try:
                if hasattr(model, 'predict'):
                    if 'regime' in name.lower() and regimes is not None:
                        pred = model.predict(features, regimes, regime_probs)
                    else:
                        pred = model.predict(features)
                    predictions[name] = pred
                elif hasattr(model, 'predict_proba'):
                    pred = model.predict_proba(features)
                    predictions[name] = pred
            except:
                predictions[name] = np.zeros(len(features))

        # Weighted ensemble prediction
        ensemble_pred = np.zeros(len(features))
        total_weight = sum(self.model_weights.values())

        for name, pred in predictions.items():
            weight = self.model_weights[name] / total_weight
            ensemble_pred += weight * pred

        return ensemble_pred

    def update_weights(self, recent_performance):
        """Update model weights based on recent performance."""

        # Softmax weighting based on performance
        performance_scores = np.array(list(recent_performance.values()))
        exp_scores = np.exp(performance_scores - np.max(performance_scores))
        softmax_weights = exp_scores / np.sum(exp_scores)

        # Update weights
        for i, name in enumerate(recent_performance.keys()):
            self.model_weights[name] = softmax_weights[i]

Model Performance Monitoring

Real-Time Performance Tracking

class ModelPerformanceMonitor:
    """Monitor model performance in real-time."""

    def __init__(self, lookback_window=63):  # ~3 months
        self.lookback_window = lookback_window
        self.performance_history = {}
        self.alerts = []

    def update_performance(self, model_name, predictions, actual_returns, dates):
        """Update performance metrics for a model."""

        if model_name not in self.performance_history:
            self.performance_history[model_name] = {
                'predictions': [],
                'returns': [],
                'dates': [],
                'ic_history': [],
                'sharpe_history': []
            }

        # Store recent data
        history = self.performance_history[model_name]
        history['predictions'].extend(predictions)
        history['returns'].extend(actual_returns)
        history['dates'].extend(dates)

        # Keep only recent data
        if len(history['predictions']) > self.lookback_window:
            history['predictions'] = history['predictions'][-self.lookback_window:]
            history['returns'] = history['returns'][-self.lookback_window:]
            history['dates'] = history['dates'][-self.lookback_window:]

        # Calculate rolling metrics
        if len(history['predictions']) >= 20:  # Minimum for meaningful metrics
            ic = np.corrcoef(history['predictions'], history['returns'])[0, 1]

            # Convert predictions to portfolio returns (simplified)
            portfolio_returns = np.array(history['predictions']) * np.array(history['returns'])
            sharpe = np.mean(portfolio_returns) / np.std(portfolio_returns) * np.sqrt(252)

            history['ic_history'].append(ic)
            history['sharpe_history'].append(sharpe)

            # Check for performance degradation
            self._check_performance_alerts(model_name, ic, sharpe)

    def _check_performance_alerts(self, model_name, current_ic, current_sharpe):
        """Check for performance degradation alerts."""

        # Alert thresholds
        min_ic_threshold = 0.02
        min_sharpe_threshold = 0.5

        if current_ic < min_ic_threshold:
            alert = {
                'timestamp': pd.Timestamp.now(),
                'model': model_name,
                'type': 'low_ic',
                'value': current_ic,
                'threshold': min_ic_threshold
            }
            self.alerts.append(alert)

        if current_sharpe < min_sharpe_threshold:
            alert = {
                'timestamp': pd.Timestamp.now(),
                'model': model_name,
                'type': 'low_sharpe',
                'value': current_sharpe,
                'threshold': min_sharpe_threshold
            }
            self.alerts.append(alert)

    def get_current_performance(self, model_name):
        """Get current performance metrics for a model."""

        if model_name not in self.performance_history:
            return None

        history = self.performance_history[model_name]

        if len(history['ic_history']) == 0:
            return None

        return {
            'current_ic': history['ic_history'][-1],
            'current_sharpe': history['sharpe_history'][-1],
            'avg_ic_30d': np.mean(history['ic_history'][-30:]) if len(history['ic_history']) >= 30 else None,
            'avg_sharpe_30d': np.mean(history['sharpe_history'][-30:]) if len(history['sharpe_history']) >= 30 else None
        }

This comprehensive model architecture ensures the Cross-Asset Alpha Engine can adapt to changing market conditions while maintaining robust performance monitoring and interpretability.