Source code for tseda.statistics.autocorrelation

"""
Autocorrelation analysis for time series.

Implements ACF, PACF, and the Ljung-Box portmanteau test entirely in
numpy / scipy — no statsmodels dependency.

Classes
-------
AutocorrelationResult
    Frozen dataclass containing ACF, PACF, confidence bounds, and
    Ljung-Box statistics.
AutocorrelationAnalyzer
    Stateless analyzer.

Theory
------
**ACF** at lag *k*:

.. math::

    \\hat{\\rho}(k) = \\frac{\\sum_{t=k+1}^{n}(x_t - \\bar{x})(x_{t-k} - \\bar{x})}
                           {\\sum_{t=1}^{n}(x_t - \\bar{x})^2}

**PACF** via Durbin-Levinson recursion on the ACF values:

.. math::

    \\phi_{k,k} = \\frac{\\hat{\\rho}(k) - \\sum_{j=1}^{k-1} \\phi_{k-1,j} \\hat{\\rho}(k-j)}
                        {1 - \\sum_{j=1}^{k-1} \\phi_{k-1,j} \\hat{\\rho}(j)}

**95 % confidence interval** for ACF (Bartlett's formula assuming white noise):
``±1.96 / √n``.

**Ljung-Box** test statistic:

.. math::

    Q = n(n+2) \\sum_{k=1}^{m} \\frac{\\hat{\\rho}(k)^2}{n - k}

    Q \\sim \\chi^2(m) \\text{ under H₀ (white noise)}

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.statistics.autocorrelation import AutocorrelationAnalyzer

>>> rng = np.random.default_rng(0)
>>> idx = pd.date_range("2020", periods=200, freq="D")
>>> ts  = TimeSeries(rng.standard_normal(200), index=idx)
>>> r   = AutocorrelationAnalyzer().analyze(ts, lags=20)
>>> len(r.acf)     # lag 0 … 20
21
>>> r.is_white_noise   # white noise → True
True
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np
from scipy import stats as sp_stats

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_lags

__all__ = ["AutocorrelationResult", "AutocorrelationAnalyzer"]

# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class AutocorrelationResult: """Immutable autocorrelation analysis result. Attributes ---------- acf : numpy.ndarray Autocorrelation function values at lags 0, 1, …, *n_lags*. ``acf[0]`` is always 1.0 (lag-0 autocorrelation). pacf : numpy.ndarray Partial autocorrelation function values at lags 0, 1, …, *n_lags*. ``pacf[0]`` is always 1.0 by convention. lags : numpy.ndarray Integer array ``[0, 1, …, n_lags]``. conf_lower : numpy.ndarray Lower 95 % confidence bound at each lag (Bartlett's approximation). conf_upper : numpy.ndarray Upper 95 % confidence bound at each lag. lb_statistic : numpy.ndarray Ljung-Box Q-statistic at each lag from 1 to *n_lags*. lb_pvalue : numpy.ndarray P-value of the Ljung-Box test at each lag. n_lags : int Number of lags requested (excluding lag 0). n_obs : int Number of non-NaN observations used. is_white_noise : bool ``True`` when the Ljung-Box p-value at lag ``min(n_lags, 20)`` exceeds ``alpha``. alpha : float Significance level used for ``is_white_noise`` and confidence bounds. """ acf: np.ndarray pacf: np.ndarray lags: np.ndarray conf_lower: np.ndarray conf_upper: np.ndarray lb_statistic: np.ndarray lb_pvalue: np.ndarray n_lags: int n_obs: int is_white_noise: bool alpha: float
[docs] def __repr__(self) -> str: # pragma: no cover sig_acf = int(np.sum(np.abs(self.acf[1:]) > np.abs(self.conf_upper[1:]))) sig_pacf = int(np.sum(np.abs(self.pacf[1:]) > np.abs(self.conf_upper[1:]))) return ( f"AutocorrelationResult(\n" f" n_obs : {self.n_obs}\n" f" n_lags : {self.n_lags}\n" f" significant ACF : {sig_acf} lag(s) outside 95% CI\n" f" significant PACF: {sig_pacf} lag(s) outside 95% CI\n" f" is_white_noise : {self.is_white_noise} (α={self.alpha})\n" f")" )
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _acf(x: np.ndarray, n_lags: int) -> np.ndarray: """Compute biased sample ACF at lags 0 … n_lags.""" n = len(x) xc = x - np.mean(x) denom = np.dot(xc, xc) if denom == 0: return np.zeros(n_lags + 1) result = np.empty(n_lags + 1) result[0] = 1.0 for k in range(1, n_lags + 1): result[k] = float(np.dot(xc[k:], xc[:-k])) / denom return result def _pacf_durbin_levinson(acf_vals: np.ndarray) -> np.ndarray: """Compute PACF via Durbin-Levinson recursion from ACF values. Parameters ---------- acf_vals : array of shape (n_lags + 1,) ACF values including lag 0. Returns ------- numpy.ndarray of shape (n_lags + 1,) PACF values (lag 0 = 1.0 by convention). """ n_lags = len(acf_vals) - 1 pacf = np.zeros(n_lags + 1) pacf[0] = 1.0 if n_lags == 0: return pacf # phi[k, j] : phi at order k, coefficient j (1-indexed) phi = np.zeros((n_lags + 1, n_lags + 1)) # Order 1 phi[1, 1] = acf_vals[1] pacf[1] = acf_vals[1] for k in range(2, n_lags + 1): # Numerator num = acf_vals[k] - sum(phi[k - 1, j] * acf_vals[k - j] for j in range(1, k)) # Denominator den = 1.0 - sum(phi[k - 1, j] * acf_vals[j] for j in range(1, k)) phi[k, k] = num / den if den != 0 else 0.0 pacf[k] = phi[k, k] # Update lower-order coefficients for j in range(1, k): phi[k, j] = phi[k - 1, j] - phi[k, k] * phi[k - 1, k - j] return pacf def _ljung_box(x: np.ndarray, acf_vals: np.ndarray, n_lags: int) -> tuple[np.ndarray, np.ndarray]: """Compute Ljung-Box Q-statistic for lags 1 … n_lags. Returns ------- (lb_stat, lb_pval) each of shape (n_lags,) """ n = len(x) rho = acf_vals[1:] # shape (n_lags,) lb_stat = np.empty(n_lags) lb_pval = np.empty(n_lags) for m in range(1, n_lags + 1): q_m = float( n * (n + 2) * np.sum(rho[:m] ** 2 / (n - np.arange(1, m + 1))) ) p_m = float(sp_stats.chi2.sf(q_m, df=m)) lb_stat[m - 1] = q_m lb_pval[m - 1] = p_m return lb_stat, lb_pval # --------------------------------------------------------------------------- # Analyzer # ---------------------------------------------------------------------------
[docs] class AutocorrelationAnalyzer: """Compute ACF, PACF, and Ljung-Box statistics for a :class:`~tseda.core.TimeSeries`. This class is **stateless**. Methods ------- analyze(ts, lags, alpha) Return an :class:`AutocorrelationResult`. significant_lags(result) Return the lag numbers where ACF or PACF exceeds the CI. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.statistics.autocorrelation import AutocorrelationAnalyzer AR(1) process: >>> rng = np.random.default_rng(7) >>> n = 300 >>> idx = pd.date_range("2020", periods=n, freq="D") >>> eps = rng.standard_normal(n) >>> x = np.zeros(n) >>> for i in range(1, n): x[i] = 0.7 * x[i-1] + eps[i] >>> ts = TimeSeries(x, index=idx) >>> r = AutocorrelationAnalyzer().analyze(ts, lags=10) >>> r.acf[1] > 0.5 # strong lag-1 autocorrelation True >>> r.is_white_noise # definitely not white noise False """
[docs] def analyze( self, ts: TimeSeries, lags: int = 40, *, alpha: float = 0.05, ) -> AutocorrelationResult: """Compute ACF, PACF, and Ljung-Box statistics. Parameters ---------- ts : TimeSeries Input series. NaN values are dropped before analysis. lags : int, optional Number of lags to compute (lag 0 is always included). Capped at ``n // 2``. Default 40. alpha : float, optional Significance level for confidence bounds and :attr:`~AutocorrelationResult.is_white_noise`. Default 0.05. Returns ------- AutocorrelationResult Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *lags* is out of range or fewer than 4 non-NaN observations. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.statistics.autocorrelation import AutocorrelationAnalyzer >>> idx = pd.date_range("2020", periods=50, freq="D") >>> ts = TimeSeries(np.ones(50), index=idx) >>> r = AutocorrelationAnalyzer().analyze(ts, lags=5) >>> r.acf[0] 1.0 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) x = ts.values[~np.isnan(ts.values)] if len(x) < 4: raise ValueError( "AutocorrelationAnalyzer requires at least 4 non-NaN observations." ) n_lags = validate_lags(lags, len(x), name="lags") n = len(x) acf_vals = _acf(x, n_lags) pacf_vals = _pacf_durbin_levinson(acf_vals) lb_stat, lb_pval = _ljung_box(x, acf_vals, n_lags) # Bartlett 95% CI ±z * se, se ≈ 1/√n for white noise z_crit = float(sp_stats.norm.ppf(1 - alpha / 2)) ci_half = z_crit / np.sqrt(n) conf_lower = np.full(n_lags + 1, -ci_half) conf_upper = np.full(n_lags + 1, ci_half) conf_lower[0] = conf_upper[0] = 1.0 # lag 0 is exactly 1 # White noise check at lag min(n_lags, 20) wn_lag = min(n_lags, 20) - 1 # 0-indexed is_wn = bool(lb_pval[wn_lag] > alpha) return AutocorrelationResult( acf=acf_vals, pacf=pacf_vals, lags=np.arange(n_lags + 1), conf_lower=conf_lower, conf_upper=conf_upper, lb_statistic=lb_stat, lb_pvalue=lb_pval, n_lags=n_lags, n_obs=n, is_white_noise=is_wn, alpha=alpha, )
[docs] def significant_lags( self, result: AutocorrelationResult, *, which: str = "acf", ) -> np.ndarray: """Return lag numbers (> 0) where the function exceeds the CI. Parameters ---------- result : AutocorrelationResult Output of :meth:`analyze`. which : str, optional ``"acf"`` (default) or ``"pacf"``. Returns ------- numpy.ndarray Integer array of significant lag numbers. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.statistics.autocorrelation import AutocorrelationAnalyzer >>> rng = np.random.default_rng(7) >>> n = 300 >>> idx = pd.date_range("2020", periods=n, freq="D") >>> eps = rng.standard_normal(n) >>> x = np.zeros(n) >>> for i in range(1, n): x[i] = 0.7 * x[i-1] + eps[i] >>> ts = TimeSeries(x, index=idx) >>> r = AutocorrelationAnalyzer().analyze(ts, lags=10) >>> len(AutocorrelationAnalyzer().significant_lags(r)) > 0 True """ if not isinstance(result, AutocorrelationResult): raise TypeError( f"'result' must be an AutocorrelationResult, " f"got {type(result).__name__!r}." ) if which not in ("acf", "pacf"): raise ValueError(f"'which' must be 'acf' or 'pacf'; got {which!r}.") vals = result.acf if which == "acf" else result.pacf ub = result.conf_upper # Exclude lag 0 (always 1.0) mask = np.abs(vals[1:]) > np.abs(ub[1:]) return result.lags[1:][mask]