Source code for tseda.decomposition.classical

"""
Classical time series decomposition.

Decomposes a :class:`~tseda.core.TimeSeries` into **trend**, **seasonal**,
and **residual** components using the centered moving-average method
(classical / X-11 style).

Two models are supported:

* **Additive** — ``y = T + S + R``  (default)
* **Multiplicative** — ``y = T × S × R``  (use when variance scales with level)

The trend is estimated by a centered moving average whose window width equals
the seasonal *period*. For even periods (e.g., 12) a 2×MA is applied to
obtain a truly centered estimate.

All arithmetic is pure numpy / pandas — no extra dependencies.

Classes
-------
DecompositionResult
    Frozen dataclass holding all four components and quality metrics.
    *Shared by* :mod:`tseda.decomposition.stl`.
ClassicalDecomposer
    Stateless decomposer.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.decomposition.classical import ClassicalDecomposer

Monthly sales with trend + seasonality:

>>> rng  = np.random.default_rng(0)
>>> n    = 60
>>> t    = np.arange(n, dtype=float)
>>> seas = np.tile([-2, -1, 0, 1, 2, 3, 3, 2, 1, 0, -1, -2], 5)
>>> y    = 100 + 0.5 * t + seas + rng.standard_normal(n) * 0.5
>>> idx  = pd.date_range("2020-01", periods=n, freq="MS")
>>> ts   = TimeSeries(y, index=idx, name="sales", unit="units")
>>> dec  = ClassicalDecomposer().decompose(ts, period=12)
>>> dec.method
'classical'
>>> dec.model
'additive'
>>> round(dec.strength_seasonal, 2) > 0.5
True
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_positive_int

__all__ = ["DecompositionResult", "ClassicalDecomposer"]

# ---------------------------------------------------------------------------
# Period lookup: default seasonal period for common frequencies
# ---------------------------------------------------------------------------

_FREQ_DEFAULT_PERIOD: dict[str, int] = {
    "s":   60,   # secondly → minute cycle
    "min": 60,   # minutely → hour cycle
    "T":   60,
    "h":   24,   # hourly → daily cycle
    "H":   24,
    "D":   7,    # daily → weekly cycle
    "B":   5,    # business-daily → weekly cycle
    "W":   52,   # weekly → annual cycle
    "MS":  12,   # monthly → annual cycle
    "M":   12,
    "ME":  12,
    "QS":  4,    # quarterly → annual cycle
    "Q":   4,
    "QE":  4,
}


def _default_period(freq: Optional[str]) -> Optional[int]:
    """Return the default seasonal period for *freq*, or ``None``."""
    if freq is None:
        return None
    key = freq.lstrip("0123456789")   # strip leading multiplier
    return _FREQ_DEFAULT_PERIOD.get(key)


# ---------------------------------------------------------------------------
# Shared result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class DecompositionResult: """Immutable time series decomposition result. Attributes ---------- original : TimeSeries The input series. trend : TimeSeries Smooth trend component. May contain NaN at the edges (classical decomposition only; STL fills the edges with LOESS extrapolation). seasonal : TimeSeries Periodic seasonal component with the same length as *original*. residual : TimeSeries Remainder after removing trend and seasonal. NaN wherever *trend* is NaN. period : int Number of observations per seasonal cycle (e.g., 12 for monthly data with an annual pattern). model : str ``"additive"`` or ``"multiplicative"``. method : str ``"classical"`` or ``"stl"``. strength_trend : float Wang et al. (2006) trend strength: ``max(0, 1 − Var(R) / Var(T + R))``. In [0, 1]. strength_seasonal : float Wang et al. (2006) seasonality strength: ``max(0, 1 − Var(R) / Var(S + R))``. In [0, 1]. n_obs_used : int Number of non-NaN residual observations used for strength metrics. """ original: TimeSeries trend: TimeSeries seasonal: TimeSeries residual: TimeSeries period: int model: str method: str strength_trend: float strength_seasonal: float n_obs_used: int # ------------------------------------------------------------------
[docs] def to_dataframe(self) -> pd.DataFrame: """Return all four components as a :class:`pandas.DataFrame`. Returns ------- pandas.DataFrame Columns: ``observed``, ``trend``, ``seasonal``, ``residual``. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.decomposition.classical import ClassicalDecomposer >>> idx = pd.date_range("2020", periods=36, freq="MS") >>> ts = TimeSeries(np.ones(36) + np.tile(np.arange(12), 3), index=idx) >>> df = ClassicalDecomposer().decompose(ts, period=12).to_dataframe() >>> list(df.columns) ['observed', 'trend', 'seasonal', 'residual'] """ return pd.DataFrame( { "observed": self.original.to_series().rename("observed"), "trend": self.trend.to_series().rename("trend"), "seasonal": self.seasonal.to_series().rename("seasonal"), "residual": self.residual.to_series().rename("residual"), } )
[docs] def summary(self) -> str: """Return a plain-text summary of the decomposition. Returns ------- str """ return ( f"DecompositionResult\n" f"{'─' * 40}\n" f" method : {self.method}\n" f" model : {self.model}\n" f" period : {self.period}\n" f" n_obs_used : {self.n_obs_used}\n" f" strength_trend : {self.strength_trend:.4f}\n" f" strength_seasonal: {self.strength_seasonal:.4f}\n" )
[docs] def __repr__(self) -> str: # pragma: no cover return self.summary()
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _centered_trend(s: pd.Series, period: int) -> pd.Series: """Compute the centered moving-average trend. For odd *period*: single MA of width *period*. For even *period*: 2×MA (MA of *period* followed by MA of 2, then shifted to centre) — the standard X-11 / Census Method I approach. """ if period % 2 == 1: return s.rolling(window=period, center=True, min_periods=period).mean() # Even period: trailing MA of period → trailing MA of 2 → shift to centre ma1 = s.rolling(window=period, min_periods=period).mean() ma2 = ma1.rolling(window=2, min_periods=2).mean() # ma2[i] is centred at position i − period/2; shift left by period//2 − 1 # so that trend[i] is centred at position i. # Derivation: # ma1[i] centres at i − (period−1)/2 (trailing window) # ma2[i] centres at i − period/2 # We want trend[i] centred at i → need ma2[i + period/2] # → trend = ma2.shift(−period//2) return ma2.shift(-(period // 2)) def _seasonal_and_residual_additive( y: np.ndarray, trend: np.ndarray, period: int ) -> tuple[np.ndarray, np.ndarray]: """Return (seasonal, residual) for the additive model.""" n = len(y) detrended = y - trend # NaN where trend is NaN — that's fine # Average at each phase (position within period), ignoring NaN phase_avg = np.array( [np.nanmean(detrended[phase::period]) for phase in range(period)] ) # Normalise so that the seasonal factors sum to zero over one period phase_avg -= np.nanmean(phase_avg) # Tile to full length seasonal = np.array([phase_avg[i % period] for i in range(n)]) residual = y - trend - seasonal return seasonal, residual def _seasonal_and_residual_multiplicative( y: np.ndarray, trend: np.ndarray, period: int ) -> tuple[np.ndarray, np.ndarray]: """Return (seasonal, residual) for the multiplicative model.""" n = len(y) if np.any(trend[~np.isnan(trend)] <= 0): raise ValueError( "Multiplicative decomposition requires a positive trend; " "the estimated trend contains non-positive values. " "Use model='additive' or apply a log-transform first." ) ratio = y / trend # NaN where trend is NaN phase_avg = np.array( [np.nanmean(ratio[phase::period]) for phase in range(period)] ) # Normalise so the average seasonal factor = 1 avg_factor = np.nanmean(phase_avg) if avg_factor == 0: raise ValueError("All seasonal factors averaged to zero — cannot normalise.") phase_avg /= avg_factor seasonal = np.array([phase_avg[i % period] for i in range(n)]) residual = y / (trend * seasonal) return seasonal, residual def _strength(residual: np.ndarray, combined: np.ndarray) -> float: """Wang et al. (2006) component strength: max(0, 1 − Var(R)/Var(C+R)).""" mask = ~(np.isnan(residual) | np.isnan(combined)) r = residual[mask] c = combined[mask] if len(r) < 2 or np.var(c) == 0: return 0.0 return float(max(0.0, 1.0 - np.var(r) / np.var(c))) def _wrap( arr: np.ndarray, orig: TimeSeries, name_suffix: str, ) -> TimeSeries: """Wrap a numpy array into a TimeSeries, inheriting metadata from *orig*.""" return TimeSeries( arr, index=orig.index, name=f"{orig.name}_{name_suffix}", freq=orig.freq, unit=orig.unit if name_suffix == "trend" else None, ) # --------------------------------------------------------------------------- # Decomposer # ---------------------------------------------------------------------------
[docs] class ClassicalDecomposer: """Decompose a :class:`~tseda.core.TimeSeries` using the centered moving-average (classical) method. The decomposer is **stateless** — one instance may be reused. Methods ------- decompose(ts, period, model) Return a :class:`DecompositionResult`. Notes ----- The classical method has well-known limitations: * The trend component has NaN at both ends (half the period width). * It assumes a fixed seasonal pattern throughout the series. * It can be sensitive to outliers. For more robust results, use :class:`~tseda.decomposition.stl.STLDecomposer`. Examples -------- Additive decomposition: >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.decomposition.classical import ClassicalDecomposer >>> rng = np.random.default_rng(1) >>> n = 48 >>> seas = np.tile(np.sin(2 * np.pi * np.arange(12) / 12) * 5, 4) >>> y = np.arange(n, dtype=float) * 0.3 + seas + rng.standard_normal(n) >>> idx = pd.date_range("2020-01", periods=n, freq="MS") >>> ts = TimeSeries(y, index=idx) >>> r = ClassicalDecomposer().decompose(ts, period=12) >>> r.model 'additive' >>> r.strength_seasonal > 0.5 True """
[docs] def decompose( self, ts: TimeSeries, period: Optional[int] = None, *, model: str = "additive", ) -> DecompositionResult: """Decompose *ts* into trend, seasonal, and residual components. Parameters ---------- ts : TimeSeries Input series. Should be regularly spaced and have length ``>= 2 × period``. period : int, optional Seasonal period (number of observations per cycle). When omitted the period is inferred from ``ts.freq``: daily → 7, monthly → 12, quarterly → 4, etc. model : str, optional ``"additive"`` (default) or ``"multiplicative"``. Returns ------- DecompositionResult Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *period* cannot be inferred, is < 2, or the series is too short; also if *model* is not recognised. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.decomposition.classical import ClassicalDecomposer >>> idx = pd.date_range("2020", periods=36, freq="MS") >>> y = np.tile(np.arange(12, dtype=float), 3) + np.linspace(0, 6, 36) >>> ts = TimeSeries(y, index=idx) >>> r = ClassicalDecomposer().decompose(ts, period=12) >>> r.seasonal.n 36 >>> r.trend.n 36 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) if model not in ("additive", "multiplicative"): raise ValueError( f"'model' must be 'additive' or 'multiplicative'; got {model!r}." ) # --- resolve period --- if period is None: period = _default_period(ts.freq) if period is None: raise ValueError( "'period' could not be inferred from the series frequency " f"({ts.freq!r}). Pass 'period' explicitly." ) period = validate_positive_int(period, name="period") if period < 2: raise ValueError(f"'period' must be >= 2, got {period}.") if ts.n < 2 * period: raise ValueError( f"Series length ({ts.n}) must be at least 2 × period " f"({2 * period}) for classical decomposition." ) y = ts.values # float64 ndarray s = ts.to_series() # --- 1. Trend: centered MA --- trend_series = _centered_trend(s, period) T = trend_series.values # --- 2. Seasonal + Residual --- if model == "additive": S, R = _seasonal_and_residual_additive(y, T, period) else: S, R = _seasonal_and_residual_multiplicative(y, T, period) # --- 3. Strength metrics --- if model == "additive": st = _strength(R, T + R) ss = _strength(R, S + R) else: # Work in log space for multiplicative model with np.errstate(divide="ignore", invalid="ignore"): log_T = np.log(np.where(T > 0, T, np.nan)) log_S = np.log(np.where(S > 0, S, np.nan)) log_R = np.log(np.where(R > 0, R, np.nan)) st = _strength(log_R, log_T + log_R) ss = _strength(log_R, log_S + log_R) valid_mask = ~np.isnan(R) n_used = int(valid_mask.sum()) return DecompositionResult( original=ts, trend=_wrap(T, ts, "trend"), seasonal=_wrap(S, ts, "seasonal"), residual=_wrap(R, ts, "residual"), period=period, model=model, method="classical", strength_trend=round(st, 6), strength_seasonal=round(ss, 6), n_obs_used=n_used, )