Source code for src.models.kalman_diffusion

#!/usr/bin/env python3
"""
Kalman Filter Diffusion Model with Improved Regularization
Brownian motion with drift + pollster biases + fundamentals prior
"""

import numpy as np
from src.models.base_model import ElectionForecastModel


[docs] class KalmanDiffusionModel(ElectionForecastModel): """Improved diffusion model with Kalman filter/RTS smoother"""
[docs] def __init__(self, seed=None): super().__init__("kalman_diffusion", seed=seed)
[docs] def kalman_filter_smoother(self, dates, observations, obs_variance, mu, sigma2): """ Kalman filter + RTS smoother for Brownian motion with drift Args: dates: Array of time points (in days) observations: Array of poll margins obs_variance: Array of observation variances mu: Drift parameter sigma2: Diffusion variance Returns: tuple of (x_smooth, P_smooth): smoothed state estimates and variances """ T = len(dates) x_filt = np.zeros(T) P_filt = np.zeros(T) x_pred = np.zeros(T) P_pred = np.zeros(T) # Initial state x_filt[0] = observations[0] P_filt[0] = obs_variance[0] # Forward filter for t in range(1, T): dt = dates[t] - dates[t - 1] x_pred[t] = x_filt[t - 1] + mu * dt P_pred[t] = P_filt[t - 1] + sigma2 * dt K = P_pred[t] / (P_pred[t] + obs_variance[t]) x_filt[t] = x_pred[t] + K * (observations[t] - x_pred[t]) P_filt[t] = (1 - K) * P_pred[t] # Backward RTS smoother x_smooth = np.copy(x_filt) P_smooth = np.copy(P_filt) for t in range(T - 2, -1, -1): dt = dates[t + 1] - dates[t] J = P_filt[t] / (P_filt[t] + sigma2 * dt) x_smooth[t] = x_filt[t] + J * (x_smooth[t + 1] - x_filt[t] - mu * dt) P_smooth[t] = P_filt[t] + J**2 * (P_smooth[t + 1] - P_filt[t] - sigma2 * dt) return x_smooth, P_smooth
[docs] def fit_state_diffusion(self, state_polls, prior_mean=0.0, max_iter=10): """ Fit diffusion model with EM algorithm Args: state_polls: DataFrame of polls for a single state prior_mean: Prior mean for fundamentals max_iter: Maximum number of EM iterations Returns: tuple of (mu, sigma2, pollster_bias, x_smooth, P_smooth, dates) """ # Use recent 1/3 of polls recent_polls = state_polls.tail(max(len(state_polls) // 3, 10)) dates = ( recent_polls["middate"] - recent_polls["middate"].min() ).dt.days.values.astype(float) margins = recent_polls["margin"].values sample_sizes = recent_polls["samplesize"].values pollsters = recent_polls["pollster"].values # Observation variance tau_extra2 = 0.015**2 obs_variance = 1.0 / sample_sizes + tau_extra2 # Estimate pollster biases with regularization pollster_bias = {} shrinkage = 0.5 for pol in np.unique(pollsters): mask = pollsters == pol if np.sum(mask) >= 2: raw_bias = np.mean(margins[mask]) - np.mean(margins) pollster_bias[pol] = shrinkage * raw_bias else: pollster_bias[pol] = 0.0 # Adjust for pollster bias adjusted_margins = margins - np.array([pollster_bias[p] for p in pollsters]) # EM algorithm mu = 0.0 sigma2 = 0.0005 for iteration in range(max_iter): x_smooth, P_smooth = self.kalman_filter_smoother( dates, adjusted_margins, obs_variance, mu, sigma2 ) # M-step: update parameters mu_vals = [] sigma2_vals = [] for t in range(1, len(dates)): dt = dates[t] - dates[t - 1] if dt > 0: mu_vals.append((x_smooth[t] - x_smooth[t - 1]) / dt) sigma2_vals.append(max((P_smooth[t] + P_smooth[t - 1]) / dt, 1e-6)) mu = np.mean(mu_vals) if mu_vals else 0.0 sigma2 = max(np.mean(sigma2_vals), 0.0005) if sigma2_vals else 0.0005 # Incorporate fundamentals prior prior_weight = 0.1 x_smooth = (1 - prior_weight) * x_smooth + prior_weight * prior_mean return mu, sigma2, pollster_bias, x_smooth, P_smooth, dates
[docs] def simulate_forward(self, x_start, P_start, mu, sigma2, days, N=2000, rng=None): """ Simulate forward with Euler-Maruyama method Args: x_start: Initial state estimate P_start: Initial state variance mu: Drift parameter sigma2: Diffusion variance days: Number of days to simulate forward N: Number of simulation samples rng: NumPy random generator (default: None uses default_rng) Returns: Array of final margin values (length N) """ if rng is None: rng = np.random.default_rng() X = np.zeros((N, days + 1)) # Ensure P_start is non-negative to avoid sqrt of negative number P_start = max(P_start, 1e-10) X[:, 0] = rng.normal(x_start, np.sqrt(P_start), N) dt = 1.0 for t in range(days): drift = mu * dt diffusion = np.sqrt(max(sigma2 * dt, 0)) dW = rng.normal(0, 1, N) X[:, t + 1] = X[:, t] + drift + diffusion * dW return X[:, -1]
[docs] def fit_and_forecast( self, state_polls, forecast_date, election_date, actual_margin, rng=None ): """Fit Kalman diffusion and forecast election outcome""" mu, sigma2, pollster_bias, x_smooth, P_smooth, dates = self.fit_state_diffusion( state_polls, prior_mean=0.0 ) # Current state estimate x_current = x_smooth[-1] P_current = P_smooth[-1] # Forecast forward days_to_election = (election_date - forecast_date).days forecast_uncertainty = 0.001 * days_to_election P_current = P_current + forecast_uncertainty**2 final_margins = self.simulate_forward( x_current, P_current, mu, sigma2, days_to_election, N=2000, rng=rng ) # Win probability win_prob = np.mean(final_margins > 0) win_prob = np.clip(win_prob, 0.01, 0.99) return { "win_probability": win_prob, "predicted_margin": np.mean(final_margins), "margin_std": np.std(final_margins), }
if __name__ == "__main__": from src.utils.logging_config import setup_logging setup_logging(__name__) model = KalmanDiffusionModel() pred_df = model.run_forecast() metrics_df = model.save_results() model.logger.info(f"Total predictions: {len(pred_df)}") model.logger.info(f"\n{metrics_df.to_string(index=False)}")