Source code for timeseries_compute.stats_model

#!/usr/bin/env python3
# timeseries_compute/stats_model.py

"""
Time Series Statistical Modeling Module.

This module implements various time series models for analyzing and forecasting
financial and economic data, with a focus on ARIMA for conditional mean modeling
and GARCH for volatility modeling. It supports both univariate and multivariate
approaches.

Key Components:
- ModelARIMA: ARIMA model for conditional mean forecasting
- ModelGARCH: GARCH model for volatility forecasting
- ModelMultivariateGARCH: Multivariate GARCH for correlation/covariance modeling
- ModelFactory: Factory pattern for creating appropriate model instances

Key Functions:
- run_arima: Convenience function for ARIMA modeling
- run_garch: Convenience function for GARCH modeling
- run_multivariate_garch: Function for multivariate GARCH analysis
- calculate_correlation_matrix: Compute correlation matrices
- calculate_portfolio_risk: Assess risk based on volatility and correlations

Supported Models:
- ARIMA(p,d,q): For modeling conditional means
- GARCH(p,q): For modeling conditional volatility
- CCC-GARCH: Constant Conditional Correlation
- DCC-GARCH: Dynamic Conditional Correlation with EWMA

Typical Usage Flow:
1. Start with prepared data from data_processor.py
2. Fit ARIMA models to capture conditional mean
3. Extract residuals and fit GARCH models for volatility
4. For multiple series, analyze correlations with multivariate GARCH
5. Generate forecasts and risk metrics

The models in this module follow standard econometric practices and use
statsmodels and arch packages for the underlying implementations.
"""

import logging as l

# handle data transformation and preparation tasks
import numpy as np
import pandas as pd

# import model specific libraries
from statsmodels.tsa.arima.model import ARIMA
from arch import arch_model

# type hinting
from typing import Dict, Any, Tuple, Union, Optional, List
from timeseries_compute import data_processor
from timeseries_compute.data_processor import (
    calculate_ewma_covariance,
    calculate_ewma_volatility,
)


[docs] class ModelARIMA: """ Applies the ARIMA (AutoRegressive Integrated Moving Average) model on all columns of a DataFrame. Attributes: data (pd.DataFrame): The input data on which ARIMA models will be applied. order (Tuple[int, int, int]): The (p, d, q) order of the ARIMA model. steps (int): The number of steps to forecast. models (Dict[str, ARIMA]): A dictionary to store ARIMA models for each column. fits (Dict[str, ARIMA]): A dictionary to store fitted ARIMA models for each column. """ def __init__( self, data: pd.DataFrame, order: Tuple[int, int, int] = (1, 1, 1), steps: int = 5, ) -> None: """ Initializes the ARIMA model with the given data, order, and steps. Args: data (pd.DataFrame): The input data for the ARIMA model. order (Tuple[int, int, int]): The (p, d, q) order of the ARIMA model. steps (int): The number of steps to forecast. """ ascii_banner = """ \n \t> ARIMA <\n""" l.info(ascii_banner) self.data = data self.order = order self.steps = steps self.models: Dict[str, ARIMA] = {} # Store models for each column self.fits: Dict[str, ARIMA] = {} # Store fits for each column
[docs] def fit(self) -> Dict[str, ARIMA]: """ Fits an ARIMA model to each column in the dataset. Returns: Dict[str, ARIMA]: A dictionary where the keys are column names and the values are the fitted ARIMA models for each column. """ for column in self.data.columns: model = ARIMA(self.data[column], order=self.order) self.fits[column] = model.fit() return self.fits
[docs] def summary(self) -> Dict[str, str]: """ Returns the model summaries for all columns. Returns: Dict[str, str]: A dictionary containing the model summaries for each column. """ summaries = {} for column, fit in self.fits.items(): summaries[column] = str(fit.summary()) return summaries
[docs] def forecast(self) -> Dict[str, Union[float, list]]: """ Generates forecasts for each fitted model. Returns: Dict[str, Union[float, list]]: A dictionary where the keys are the column names and the values are the forecasted values. If steps=1, returns a float. If steps>1, returns a list. """ forecasts = {} for column, fit in self.fits.items(): # Force use the steps parameter directly to avoid any override issues forecast_result = fit.forecast(steps=self.steps) # CRITICAL FIX: Always check the actual forecast length, not just self.steps if hasattr(forecast_result, "__len__") and len(forecast_result) > 1: # Multiple forecast values - return as list forecasts[column] = forecast_result.tolist() else: # Single forecast value - return as float forecasts[column] = ( forecast_result.iloc[0] if hasattr(forecast_result, "iloc") else float(forecast_result) ) return forecasts
[docs] def run_arima( df_stationary: pd.DataFrame, p: int = 1, d: int = 1, q: int = 1, forecast_steps: int = 5, ) -> Tuple[Dict[str, object], Dict[str, Union[float, List[float]]]]: """ Runs an ARIMA model on stationary time series data. This function fits ARIMA(p,d,q) models to each column in the provided DataFrame and generates forecasts for the specified number of steps ahead. It performs minimal logging to display only core information about the model and forecasts. Args: df_stationary (pd.DataFrame): The DataFrame with stationary time series data p (int): Autoregressive lag order, default=1 d (int): Degree of differencing, default=1 q (int): Moving average lag order, default=1 forecast_steps (int): Number of steps to forecast, default=5 Returns: Tuple[Dict[str, object], Dict[str, Union[float, List[float]]]]: - First element: Dictionary of fitted ARIMA models for each column - Second element: Dictionary of forecasted values for each column """ l.info(f"\n## Running ARIMA(p={p}, d={d}, q={q})") # Ensure data is properly prepared with Date as index df_stationary = data_processor.prepare_timeseries_data(df_stationary) model_arima = ModelFactory.create_model( model_type="ARIMA", data=df_stationary, order=(p, d, q), steps=forecast_steps, ) arima_fit = model_arima.fit() l.info(f"## ARIMA model fitted to columns: {list(arima_fit.keys())}") arima_forecast = model_arima.forecast() # Debug: Check what we actually got from forecast l.info(f"## DEBUG: Raw forecast results from model_arima.forecast():") for col, value in arima_forecast.items(): l.info( f" DEBUG {col}: type={type(value)}, length={len(value) if hasattr(value, '__len__') else 'N/A'}, value={value}" ) l.info(f"## ARIMA {forecast_steps}-step forecast values:") for col, value in arima_forecast.items(): if isinstance(value, list): # Format list of forecast values value_str = ", ".join(f"{v:.4f}" for v in value) l.info(f" {col}: [{value_str}]") else: # Format single forecast value l.info(f" {col}: {value:.4f}") return arima_fit, arima_forecast
[docs] class ModelGARCH: """ Represents a GARCH model for time series data. Attributes: data (pd.DataFrame): The input time series data. p (int): The order of the GARCH model for the lag of the squared residuals. q (int): The order of the GARCH model for the lag of the conditional variance. dist (str): The distribution to use for the GARCH model (e.g., 'normal', 't'). models (Dict[str, arch_model]): A dictionary to store models for each column of the data. fits (Dict[str, arch_model]): A dictionary to store fitted models for each column of the data. """ def __init__( self, data: pd.DataFrame, p: int = 1, q: int = 1, dist: str = "normal" ) -> None: """ Initializes the GARCH model with the given parameters. Args: data (pd.DataFrame): The input data for the GARCH model. p (int): The order of the GARCH model. q (int): The order of the ARCH model. dist (str): The distribution to be used in the model (e.g., 'normal', 't'). """ ascii_banner = """ \n\t> GARCH <\n""" l.info(ascii_banner) self.data = data self.p = p self.q = q self.dist = dist self.models: Dict[str, arch_model] = {} # Store models for each column self.fits: Dict[str, arch_model] = {} # Store fits for each column
[docs] def fit(self) -> Dict[str, arch_model]: """ Fits a GARCH model to each column of the data. Returns: Dict[str, arch_model]: A dictionary where the keys are column names and the values are the fitted GARCH models. """ for column in self.data.columns: model = arch_model( self.data[column], vol="Garch", p=self.p, q=self.q, dist=self.dist ) self.fits[column] = model.fit(disp="off") return self.fits
[docs] def summary(self) -> Dict[str, str]: """ Returns the model summaries for all columns. Returns: Dict[str, str]: A dictionary containing the model summaries for each column. """ summaries = {} for column, fit in self.fits.items(): summaries[column] = str(fit.summary()) return summaries
[docs] def forecast(self, steps: int) -> Dict[str, float]: """ Generates forecasted variance for each fitted model. Args: steps (int): The number of steps ahead to forecast. Returns: Dict[str, float]: A dictionary where keys are column names and values are the forecasted variances for the specified horizon. """ forecasts = {} for column, fit in self.fits.items(): forecasts[column] = fit.forecast(horizon=steps).variance.iloc[-1] return forecasts
[docs] class ModelMultivariateGARCH: """Implements multivariate GARCH models including CC-GARCH and DCC-GARCH.""" def __init__( self, data: pd.DataFrame, p: int = 1, q: int = 1, model_type: str = "cc" ): """ Initialize multivariate GARCH model. Args: data: DataFrame with multiple time series p: GARCH order q: ARCH order model_type: 'cc' for Constant Correlation or 'dcc' for Dynamic Conditional Correlation """ # If data has Date column, set it as index for time series operations self.data = data self.data = data self.p = p self.q = q self.model_type = model_type self.fits = {}
[docs] def fit_cc_garch(self) -> Dict[str, Any]: """Fit Constant Conditional Correlation GARCH model.""" # First fit univariate GARCH models univariate_models = {} for column in self.data.columns: model = arch_model(self.data[column], vol="Garch", p=self.p, q=self.q) univariate_models[column] = model.fit(disp="off") # Calculate constant correlation matrix residuals = pd.DataFrame() for column in self.data.columns: residuals[column] = univariate_models[column].resid correlation_matrix = residuals.corr() # Store results self.cc_results = { "univariate_models": univariate_models, "correlation": correlation_matrix, } return self.cc_results
[docs] def fit_dcc_garch(self, lambda_val: float = 0.95): """ Fit Dynamic Conditional Correlation GARCH model using EWMA for correlation. Args: lambda_val: EWMA decay factor Returns: Dictionary with DCC-GARCH results """ # Fit univariate GARCH models univariate_models = {} conditional_vols = pd.DataFrame(index=self.data.index) for column in self.data.columns: model = arch_model(self.data[column], vol="Garch", p=self.p, q=self.q) fit = model.fit(disp="off") univariate_models[column] = fit conditional_vols[column] = np.sqrt(fit.conditional_volatility) # Calculate standardized residuals std_residuals = pd.DataFrame(index=self.data.index) for column in self.data.columns: std_residuals[column] = self.data[column] / conditional_vols[column] # Calculate EWMA correlation for all pairs correlations = {} columns = self.data.columns for i in range(len(columns)): for j in range(i + 1, len(columns)): col_pair = f"{columns[i]}_{columns[j]}" correlations[col_pair] = calculate_ewma_covariance( std_residuals[columns[i]], std_residuals[columns[j]], lambda_val ) self.dcc_results = { "univariate_models": univariate_models, "conditional_vols": conditional_vols, "correlations": correlations, } return self.dcc_results
# These functions should be outside the class
[docs] def calculate_correlation_matrix(standardized_residuals: pd.DataFrame) -> pd.DataFrame: """ Calculate constant conditional correlation matrix from standardized residuals. Args: standardized_residuals (pd.DataFrame): DataFrame of standardized residuals from GARCH models Returns: pd.DataFrame: Correlation matrix """ return standardized_residuals.corr()
[docs] def calculate_dynamic_correlation( ewma_cov: pd.Series, ewma_vol1: pd.Series, ewma_vol2: pd.Series ) -> pd.Series: """ Calculate dynamic conditional correlation from EWMA covariance and volatilities. Args: ewma_cov (pd.Series): EWMA covariance between two series ewma_vol1 (pd.Series): EWMA volatility of first series ewma_vol2 (pd.Series): EWMA volatility of second series Returns: pd.Series: Dynamic conditional correlation """ return ewma_cov / (ewma_vol1 * ewma_vol2)
[docs] def construct_covariance_matrix(volatilities: list, correlation: float) -> np.ndarray: """ Construct a 2x2 covariance matrix using volatilities and correlation. Args: volatilities (list): List of volatilities [vol1, vol2] correlation (float): Correlation coefficient Returns: np.ndarray: 2x2 covariance matrix """ cov_matrix = np.outer(volatilities, volatilities) cov_matrix[0, 1] *= correlation cov_matrix[1, 0] *= correlation return cov_matrix
[docs] def calculate_portfolio_risk(weights: np.ndarray, cov_matrix: np.ndarray) -> tuple: """ Calculate portfolio variance and volatility for given weights and covariance matrix. Args: weights (np.ndarray): Array of portfolio weights cov_matrix (np.ndarray): Covariance matrix Returns: tuple: (portfolio_variance, portfolio_volatility) """ portfolio_variance = np.dot(weights.T, np.dot(cov_matrix, weights)) portfolio_volatility = np.sqrt(portfolio_variance) return portfolio_variance, portfolio_volatility
[docs] def run_multivariate_garch( df_stationary: pd.DataFrame, arima_fits: Optional[Dict[str, Any]] = None, garch_fits: Optional[Dict[str, Any]] = None, lambda_val: float = 0.95, ) -> Dict[str, Any]: """ Runs multivariate GARCH analysis on the provided stationary DataFrame. This function implements both Constant Conditional Correlation (CCC) and Dynamic Conditional Correlation (DCC) GARCH models. It either uses provided ARIMA and GARCH models or fits new ones if not provided. Args: df_stationary (pd.DataFrame): The stationary time series data for GARCH modeling arima_fits (dict, optional): Dictionary of fitted ARIMA models for each column garch_fits (dict, optional): Dictionary of fitted GARCH models for each column lambda_val (float, optional): EWMA decay factor for DCC model. Defaults to 0.95. Returns: dict: Dictionary containing multivariate GARCH results - 'arima_residuals': DataFrame of ARIMA residuals - 'conditional_volatilities': DataFrame of conditional volatilities - 'standardized_residuals': DataFrame of standardized residuals - 'cc_correlation': Constant conditional correlation matrix - 'cc_covariance_matrix': Covariance matrix using CCC - 'dcc_correlation': Series of dynamic conditional correlations - 'dcc_covariance': Series of dynamic conditional covariances Example: >>> # Create stationary returns for two assets >>> returns = pd.DataFrame({ ... 'Asset1': [0.01, -0.02, 0.015, -0.01, 0.02], ... 'Asset2': [0.015, -0.01, 0.02, -0.015, 0.01] ... }) >>> # Run multivariate GARCH analysis >>> results = run_multivariate_garch(returns) >>> # Access the correlation matrix >>> print(results['cc_correlation']) >>> # Plot dynamic correlation over time >>> plt.plot(results['dcc_correlation']) """ results = {} # 1. If ARIMA fits not provided, fit ARIMA models to filter out conditional mean if arima_fits is None: arima_fits, _ = run_arima( df_stationary=df_stationary, p=1, d=0, q=1, forecast_steps=1 ) # 2. Extract ARIMA residuals arima_residuals = pd.DataFrame(index=df_stationary.index) for column in df_stationary.columns: if hasattr(arima_fits[column], "resid"): arima_residuals[column] = arima_fits[column].resid else: # If no residuals available, use original series arima_residuals[column] = df_stationary[column] results["arima_residuals"] = arima_residuals # 3. If GARCH fits not provided, fit GARCH models if garch_fits is None: garch_fits, _ = run_garch( df_stationary=arima_residuals, p=1, q=1, forecast_steps=1 ) # 4. Extract conditional volatilities cond_vol = {} for column in arima_residuals.columns: cond_vol[column] = np.sqrt(garch_fits[column].conditional_volatility) cond_vol_df = pd.DataFrame(cond_vol, index=arima_residuals.index) results["conditional_volatilities"] = cond_vol_df # 5. Calculate standardized residuals std_resid = {} for column in arima_residuals.columns: std_resid[column] = arima_residuals[column] / cond_vol[column] std_resid_df = pd.DataFrame(std_resid, index=arima_residuals.index) results["standardized_residuals"] = std_resid_df # 6. Constant Conditional Correlation (CCC-GARCH) cc_corr = calculate_correlation_matrix(std_resid_df) results["cc_correlation"] = cc_corr # 7. Get latest volatilities for covariance matrix if len(arima_residuals.columns) == 2: columns = list(arima_residuals.columns) latest_vols = [cond_vol[col].iloc[-1] for col in columns] # Construct covariance matrix using CCC cc_cov_matrix = construct_covariance_matrix( volatilities=latest_vols, correlation=cc_corr.iloc[0, 1] ) results["cc_covariance_matrix"] = cc_cov_matrix # 8. Dynamic Conditional Correlation (DCC-GARCH) if len(arima_residuals.columns) == 2: columns = list(std_resid_df.columns) # Calculate EWMA covariance ewma_cov = calculate_ewma_covariance( std_resid_df[columns[0]], std_resid_df[columns[1]], lambda_val=lambda_val ) # Calculate EWMA volatilities for standardized residuals ewma_vol1 = calculate_ewma_volatility( std_resid_df[columns[0]], lambda_val=lambda_val ) ewma_vol2 = calculate_ewma_volatility( std_resid_df[columns[1]], lambda_val=lambda_val ) # Calculate dynamic correlation dcc_corr = calculate_dynamic_correlation(ewma_cov, ewma_vol1, ewma_vol2) results["dcc_correlation"] = dcc_corr # Calculate dynamic covariance dcc_cov = dcc_corr * (cond_vol_df[columns[0]] * cond_vol_df[columns[1]]) results["dcc_covariance"] = dcc_cov return results
[docs] class ModelFactory: """ Factory class for creating instances of different statistical models. Methods: create_model(model_type: str, **kwargs) -> Any: Static method that creates and returns an instance of a model based on the provided model_type. """
[docs] @staticmethod def create_model( model_type: str, data: pd.DataFrame, # ARIMA parameters with defaults order: Tuple[int, int, int] = (1, 1, 1), steps: int = 5, # GARCH parameters with defaults p: int = 1, q: int = 1, dist: str = "normal", # Multivariate GARCH parameters mv_model_type: str = "cc", ) -> Union[ModelARIMA, ModelGARCH, ModelMultivariateGARCH]: """ Creates and returns an instance of a statistical model based on the specified type. Args: model_type (str): Type of model to create ("ARIMA", "GARCH", or "MVGARCH") data (pd.DataFrame): Input data for the model order (Tuple[int, int, int]): (p,d,q) order for ARIMA models steps (int): Forecast horizon for ARIMA models p (int): GARCH order parameter q (int): ARCH order parameter dist (str): Error distribution for GARCH models mv_model_type (str): Type of multivariate GARCH model ("cc" or "dcc") Returns: Union[ModelARIMA, ModelGARCH, ModelMultivariateGARCH]: The created model instance Raises: ValueError: If an unsupported model type is provided """ l.info(f"Creating model type: {model_type}") if model_type.lower() == "arima": return ModelARIMA(data=data, order=order, steps=steps) elif model_type.lower() == "garch": return ModelGARCH(data=data, p=p, q=q, dist=dist) elif model_type.lower() == "mvgarch": return ModelMultivariateGARCH(data=data, p=p, q=q, model_type=mv_model_type) else: raise ValueError(f"Unsupported model type: {model_type}")
[docs] def run_garch( df_stationary: pd.DataFrame, p: int = 1, q: int = 1, dist: str = "normal", forecast_steps: int = 5, ) -> Tuple[Dict[str, Any], Dict[str, float]]: """ Runs the GARCH model on the provided stationary DataFrame. This function fits GARCH(p,q) models to each column in the provided DataFrame and generates volatility forecasts. It performs minimal logging to display only core information about the model and forecasts. Args: df_stationary (pd.DataFrame): The stationary time series data for GARCH modeling p (int): The GARCH lag order, default=1 q (int): The ARCH lag order, default=1 dist (str): The error distribution - 'normal', 't', etc., default="normal" forecast_steps (int): The number of steps to forecast, default=5 Returns: Tuple[Dict[str, Any], Dict[str, float]]: - First element: Dictionary of fitted GARCH models for each column - Second element: Dictionary of forecasted volatility values for each column """ l.info(f"\n## Running GARCH(p={p}, q={q}, dist={dist})") # Ensure data is properly prepared for time series analysis try: df_stationary = data_processor.prepare_timeseries_data(df_stationary) except Exception as e: l.error(f"Error preparing data for GARCH model: {e}") raise ValueError(f"Failed to prepare data for GARCH model: {str(e)}") # Create and fit the GARCH model try: model_garch = ModelFactory.create_model( model_type="GARCH", data=df_stationary, p=p, q=q, dist=dist, ) garch_fit = model_garch.fit() l.info(f"## GARCH model fitted to columns: {list(garch_fit.keys())}") garch_forecast = model_garch.forecast(steps=forecast_steps) l.info(f"## GARCH {forecast_steps}-step volatility forecast:") for col, value in garch_forecast.items(): if hasattr(value, "iloc"): value_str = ", ".join(f"{v:.6f}" for v in value) l.info(f" {col}: [{value_str}]") else: l.info(f" {col}: {value:.6f}") return garch_fit, garch_forecast except Exception as e: l.error(f"Error during GARCH model fitting or forecasting: {e}") raise RuntimeError(f"GARCH model failed: {str(e)}")
[docs] def calculate_stats(series: pd.Series) -> Dict[str, float]: """ Calculate comprehensive statistics for a time series. Args: series (pd.Series): Time series data to analyze Returns: Dict[str, float]: Dictionary containing the following statistics: - 'n': Number of observations in the series - 'mean': Arithmetic mean of the series - 'median': Median value of the series - 'min': Minimum value in the series - 'max': Maximum value in the series - 'std': Standard deviation of the series - 'skew': Skewness of the distribution (asymmetry measure) - 'kurt': Kurtosis of the distribution (tail heaviness measure) - 'annualized_vol': Annualized volatility, calculated as standard deviation * sqrt(250) for daily data Example: >>> series = pd.Series([1.2, 2.3, 3.4, 4.5, 5.6]) >>> stats = calculate_stats(series) >>> print(f"Mean: {stats['mean']}, Std Dev: {stats['std']}") """ return { "n": len(series), "mean": series.mean(), "median": series.median(), "min": series.min(), "max": series.max(), "std": series.std(), "skew": series.skew(), "kurt": series.kurtosis(), "annualized_vol": series.std() * np.sqrt(250), # Assuming daily data }