Skip to content

System Architecture

IMPORTANT: All empirical analysis in this project is conducted at daily frequency using daily OHLCV bars from Polygon.io. No intraday, tick, or order-book data is used in the current experiment.

Overview

The Cross-Asset Alpha Engine employs a modular, scalable architecture designed for both research and production deployment. The system separates concerns across data ingestion, feature engineering, regime detection, alpha generation, and portfolio construction. Execution is modeled at daily close-to-close with simple costs, not an intraday microstructure model.

High-Level Architecture

graph TB
    A[Market Data Sources] --> B[Data Infrastructure Layer]
    B --> C[Feature Engineering Engine]
    C --> D[Regime Detection System]
    D --> E[Alpha Generation Framework]
    E --> F[Portfolio Construction Engine]
    F --> G[Execution Simulation Module]
    G --> H[Performance Analytics Suite]

    subgraph "Data Layer"
        B1[Polygon API Client]
        B2[Parquet Caching]
        B3[Data Validation]
        B4[Asset Universe Management]
    end

    subgraph "Feature Layer"
        C1[Technical Analysis]
        C2[Daily Microstructure-Inspired Analysis]
        C3[Cross-Asset Signals]
    end

    subgraph "Model Layer"
        E1[Random Forest]
        E2[Regime-Specific Models]
        E3[Feature Selection]
    end

Component Architecture

1. Data Infrastructure Layer

Polygon API Client

class PolygonClient:
    """Professional-grade API client with retry logic and rate limiting."""

    def __init__(self, api_key: str, base_delay: float = 0.5):
        self.api_key = api_key
        self.base_delay = base_delay
        self.session = requests.Session()

    def get_daily_bars(self, symbol: str, start_date: date, end_date: date):
        """Fetch daily OHLCV data with automatic retry."""
        url = f"{self.base_url}/v2/aggs/ticker/{symbol}/range/1/day/{start_date}/{end_date}"

        for attempt in range(self.max_retries):
            try:
                response = self.session.get(url, headers=self.headers)
                if response.status_code == 200:
                    return response.json()
                elif response.status_code == 429:  # Rate limited
                    time.sleep(self.base_delay * (2 ** attempt))
                    continue
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise e
                time.sleep(self.base_delay * (2 ** attempt))

Data Caching System

class DataCache:
    """Efficient parquet-based caching system."""

    def __init__(self, cache_dir: str = "cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)

    def save_data(self, data: pd.DataFrame, symbol: str, start_date: date, end_date: date):
        """Save data with metadata for efficient retrieval."""
        cache_key = self._generate_cache_key(symbol, start_date, end_date)
        cache_path = self.cache_dir / f"{cache_key}.parquet"

        # Save with compression
        data.to_parquet(cache_path, compression='snappy', index=False)

        # Save metadata
        metadata = {
            "symbol": symbol,
            "start_date": start_date.isoformat(),
            "end_date": end_date.isoformat(),
            "cached_at": datetime.now().isoformat(),
            "records": len(data)
        }

        with open(cache_path.with_suffix('.metadata.json'), 'w') as f:
            json.dump(metadata, f)

2. Feature Engineering Engine

Technical Analysis Module

class TechnicalFeatureEngine:
    """Comprehensive technical analysis feature generation."""

    def __init__(self, config: TechnicalConfig):
        self.config = config

    def generate_momentum_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Generate multi-timeframe momentum indicators."""
        features = data.copy()

        # Multi-horizon returns
        for period in [1, 5, 20, 60]:
            features[f'returns_{period}d'] = data['close'].pct_change(period)

        # Momentum acceleration
        features['momentum_accel'] = (
            features['returns_5d'] - features['returns_20d']
        )

        # Relative strength
        features['relative_strength'] = (
            features['returns_20d'] / features['returns_60d']
        )

        return features

    def generate_volatility_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Generate volatility-based features."""
        features = data.copy()
        returns = data['close'].pct_change()

        # Multi-horizon volatility
        for window in [5, 20, 60]:
            features[f'volatility_{window}d'] = (
                returns.rolling(window).std() * np.sqrt(252)
            )

        # Volatility ratios
        features['vol_ratio_5_20'] = (
            features['volatility_5d'] / features['volatility_20d']
        )

        # Volatility persistence
        features['vol_persistence'] = (
            features['volatility_5d'].rolling(5).mean()
        )

        return features

Daily Microstructure-Inspired Analysis Module

class MicrostructureFeatureEngine:
    """Daily microstructure-inspired feature generation from daily OHLCV bars.

    Note: All features are computed from daily bars, not true intraday or tick data.
    """

    def generate_vwap_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """VWAP-based features computed from daily OHLCV bars."""
        features = data.copy()

        # VWAP deviation
        features['vwap_deviation'] = (
            (data['close'] - data['vwap']) / data['vwap']
        )

        # VWAP momentum
        features['vwap_momentum'] = data['vwap'].pct_change(5)

        # Price improvement vs VWAP
        features['price_improvement'] = np.where(
            data['close'] > data['vwap'], 1, -1
        ) * features['vwap_deviation'].abs()

        return features

    def generate_volume_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """Volume-based features."""
        features = data.copy()

        # Volume z-score
        vol_mean = data['volume'].rolling(20).mean()
        vol_std = data['volume'].rolling(20).std()
        features['volume_zscore'] = (data['volume'] - vol_mean) / vol_std

        # Volume-price correlation
        features['vol_price_corr'] = (
            data['volume'].rolling(20).corr(data['close'].pct_change())
        )

        return features

3. Regime Detection System

Hidden Markov Model Implementation

from hmmlearn import hmm

class RegimeHMM:
    """Hidden Markov Model for regime detection."""

    def __init__(self, n_components: int = 3, covariance_type: str = "full"):
        self.n_components = n_components
        self.model = hmm.GaussianHMM(
            n_components=n_components,
            covariance_type=covariance_type,
            n_iter=100
        )

    def fit(self, features: np.ndarray) -> 'RegimeHMM':
        """Fit HMM to regime features."""
        # Standardize features
        self.scaler = StandardScaler()
        features_scaled = self.scaler.fit_transform(features)

        # Fit HMM
        self.model.fit(features_scaled)

        return self

    def predict_regimes(self, features: np.ndarray) -> np.ndarray:
        """Predict most likely regime sequence."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict(features_scaled)

    def predict_proba(self, features: np.ndarray) -> np.ndarray:
        """Predict regime probabilities."""
        features_scaled = self.scaler.transform(features)
        return self.model.predict_proba(features_scaled)

4. Alpha Generation Framework

Random Forest Implementation

class AlphaModel:
    """Regime-aware alpha generation model."""

    def __init__(self, model_config: dict):
        self.config = model_config
        self.models = {}
        self.feature_importance = {}

    def fit(self, features: pd.DataFrame, targets: pd.Series, regimes: np.ndarray):
        """Train regime-specific models."""

        # Overall model
        self.models['overall'] = RandomForestRegressor(**self.config)
        self.models['overall'].fit(features, targets)

        # Regime-specific models
        for regime in np.unique(regimes):
            regime_mask = regimes == regime
            if regime_mask.sum() > 50:  # Minimum samples
                regime_features = features[regime_mask]
                regime_targets = targets[regime_mask]

                model = RandomForestRegressor(**self.config)
                model.fit(regime_features, regime_targets)
                self.models[f'regime_{regime}'] = model

                # Store feature importance
                self.feature_importance[f'regime_{regime}'] = pd.DataFrame({
                    'feature': features.columns,
                    'importance': model.feature_importances_
                }).sort_values('importance', ascending=False)

    def predict(self, features: pd.DataFrame, regimes: np.ndarray, 
                regime_probs: np.ndarray = None) -> np.ndarray:
        """Generate alpha predictions with regime awareness."""
        predictions = np.zeros(len(features))

        if regime_probs is not None:
            # Weighted ensemble based on regime probabilities
            for regime in range(regime_probs.shape[1]):
                model_key = f'regime_{regime}'
                if model_key in self.models:
                    regime_preds = self.models[model_key].predict(features)
                    predictions += regime_probs[:, regime] * regime_preds
        else:
            # Use most likely regime
            for i, regime in enumerate(regimes):
                model_key = f'regime_{regime}'
                if model_key in self.models:
                    predictions[i] = self.models[model_key].predict(features.iloc[[i]])[0]
                else:
                    predictions[i] = self.models['overall'].predict(features.iloc[[i]])[0]

        return predictions

5. Portfolio Construction Engine

Position Sizing System

class PortfolioConstructor:
    """Advanced portfolio construction with risk controls."""

    def __init__(self, config: PortfolioConfig):
        self.config = config

    def construct_portfolio(self, alpha_scores: pd.Series, 
                          volatilities: pd.Series) -> pd.Series:
        """Construct risk-controlled portfolio."""

        # Alpha-based sizing
        alpha_ranks = alpha_scores.rank(ascending=False)
        alpha_zscore = (alpha_scores - alpha_scores.mean()) / alpha_scores.std()

        # Volatility adjustment
        vol_adjusted_alpha = alpha_zscore / volatilities

        # Initial position sizing
        positions = vol_adjusted_alpha * self.config.base_position_size

        # Apply risk controls
        positions = self._apply_risk_controls(positions, volatilities)

        return positions

    def _apply_risk_controls(self, positions: pd.Series, 
                           volatilities: pd.Series) -> pd.Series:
        """Apply portfolio-level risk controls."""

        # Individual position limits
        positions = positions.clip(-self.config.max_position, self.config.max_position)

        # Gross exposure limit
        gross_exposure = positions.abs().sum()
        if gross_exposure > self.config.max_gross_exposure:
            positions *= self.config.max_gross_exposure / gross_exposure

        # Market neutrality (net exposure ≈ 0)
        net_exposure = positions.sum()
        positions -= net_exposure / len(positions)

        # Risk parity adjustment
        if self.config.risk_parity:
            inv_vol = 1 / volatilities
            risk_weights = inv_vol / inv_vol.sum()
            positions = positions.abs().sum() * risk_weights * np.sign(positions)

        return positions

6. Performance Analytics Suite

Backtesting Engine

class BacktestEngine:
    """Comprehensive backtesting with daily execution and transaction costs.

    Note: Execution is modeled at daily close-to-close with simple costs,
    not an intraday microstructure model. All analysis uses daily OHLCV bars only.
    """

    def __init__(self, config: BacktestConfig):
        self.config = config

    def run_backtest(self, signals: pd.DataFrame, prices: pd.DataFrame) -> dict:
        """Execute full backtest with daily transaction costs (not intraday execution)."""

        results = {
            'positions': [],
            'returns': [],
            'transactions': [],
            'metrics': {}
        }

        current_positions = pd.Series(0.0, index=signals.columns)

        for date, signal_row in signals.iterrows():
            # Calculate target positions
            target_positions = signal_row

            # Calculate trades
            trades = target_positions - current_positions

            # Apply transaction costs
            transaction_costs = self._calculate_transaction_costs(trades, prices.loc[date])

            # Update positions
            current_positions = target_positions

            # Calculate returns
            if date in prices.index:
                price_returns = prices.loc[date].pct_change()
                portfolio_return = (current_positions * price_returns).sum()
                portfolio_return -= transaction_costs

                results['returns'].append(portfolio_return)
                results['positions'].append(current_positions.copy())

        # Calculate performance metrics
        returns_series = pd.Series(results['returns'], index=signals.index[1:])
        results['metrics'] = self._calculate_metrics(returns_series)

        return results

    def _calculate_transaction_costs(self, trades: pd.Series, prices: pd.Series) -> float:
        """Calculate simplified transaction costs for daily execution.

        Note: This is a simplified model for daily rebalancing. True intraday
        microstructure modeling (order books, tick data) is not used.
        """
        # Commission costs
        commission = trades.abs().sum() * self.config.commission_rate

        # Bid-ask spread costs (simplified for daily execution)
        spread_cost = trades.abs().sum() * self.config.spread_cost

        # Market impact (simplified square root law for daily trades)
        market_impact = (trades.abs() ** 0.5).sum() * self.config.impact_coefficient

        return commission + spread_cost + market_impact

Deployment Architecture

Development Environment

# Local development setup
class DevelopmentConfig:
    DATA_SOURCE = "polygon_api"
    CACHE_ENABLED = True
    LOG_LEVEL = "DEBUG"
    BACKTEST_MODE = True

Production Environment

# Production deployment configuration
class ProductionConfig:
    DATA_SOURCE = "real_time_feed"
    CACHE_ENABLED = True
    LOG_LEVEL = "INFO"
    RISK_CHECKS_ENABLED = True
    POSITION_LIMITS_ENFORCED = True

Scalability Considerations

Data Management

  • Incremental Updates: Only fetch new data since last update
  • Parallel Processing: Multi-core feature generation and model training
  • Memory Optimization: Efficient data structures for large datasets
  • Caching Strategy: Intelligent caching of expensive computations

Model Management

  • Model Versioning: Track model performance over time
  • A/B Testing: Compare different model configurations
  • Automated Retraining: Regular model updates with new data
  • Performance Monitoring: Real-time model performance tracking

This architecture ensures the Cross-Asset Alpha Engine can scale from research prototype to institutional production system while maintaining robust risk controls and performance monitoring capabilities.