Source code for tseda.forecastability.scorer

"""
Forecastability scoring for time series.

Computes a composite 0–100 readiness score from six diagnostic sub-scores
and recommends a modelling strategy.

Sub-scores and weights
----------------------

+-------------------+--------+---------------------------------------------+
| Sub-score         | Weight | Measures                                    |
+===================+========+=============================================+
| data_quality      |  20 %  | Inverse of missing % + IQR-outlier %        |
+-------------------+--------+---------------------------------------------+
| stationarity      |  15 %  | ADF + KPSS combined verdict                 |
+-------------------+--------+---------------------------------------------+
| signal_to_noise   |  20 %  | STL/classical strength_trend + strength_sea |
+-------------------+--------+---------------------------------------------+
| autocorrelation   |  15 %  | Max |ACF| at significant lags               |
+-------------------+--------+---------------------------------------------+
| sample_size       |  15 %  | n / (2 × period) — enough seasonal cycles   |
+-------------------+--------+---------------------------------------------+
| regularity        |  15 %  | is_regular flag + absence of large gaps      |
+-------------------+--------+---------------------------------------------+

Classes
-------
ForecastabilityReport
    Frozen dataclass returned by :meth:`ForecastabilityScorer.score`.
ForecastabilityScorer
    Stateless scorer.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.forecastability.scorer import ForecastabilityScorer

Simple AR(1) process — moderate forecastability:

>>> rng = np.random.default_rng(0)
>>> n   = 300
>>> idx = pd.date_range("2020-01-01", periods=n, freq="D")
>>> eps = rng.standard_normal(n)
>>> x   = np.zeros(n)
>>> for i in range(1, n): x[i] = 0.7 * x[i-1] + eps[i]
>>> ts  = TimeSeries(x, index=idx)
>>> r   = ForecastabilityScorer().score(ts)
>>> 0 <= r.score <= 100
True
>>> r.recommended_model in ("ARIMA", "SARIMA", "ETS", "Prophet", "ML")
True
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries
from tseda.statistics.autocorrelation import AutocorrelationAnalyzer
from tseda.statistics.stationarity import StationarityTester

__all__ = ["ForecastabilityReport", "ForecastabilityScorer"]

_WEIGHTS: Dict[str, float] = {
    "data_quality":    0.20,
    "stationarity":    0.15,
    "signal_to_noise": 0.20,
    "autocorrelation": 0.15,
    "sample_size":     0.15,
    "regularity":      0.15,
}

assert abs(sum(_WEIGHTS.values()) - 1.0) < 1e-9, "weights must sum to 1"


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class ForecastabilityReport: """Immutable forecastability assessment. Attributes ---------- score : float Overall forecastability score in [0, 100]. Higher is better. sub_scores : dict of str → float Individual sub-scores (0–100 each) keyed by sub-score name. recommended_model : str Suggested modelling approach: ``"ARIMA"``, ``"SARIMA"``, ``"ETS"``, ``"Prophet"``, or ``"ML"``. recommended_diff : int Recommended differencing order: 0 (already stationary) or 1. recommended_period : int or None Dominant seasonal period detected, or ``None`` if no seasonality found. n_obs : int Number of observations in the series. pct_missing : float Percentage of NaN values. pct_outlier : float Percentage of IQR-flagged outliers. is_stationary : bool ``True`` when the ADF test rejects the unit-root null. dominant_period : int or None Same as :attr:`recommended_period`. """ score: float sub_scores: Dict[str, float] recommended_model: str recommended_diff: int recommended_period: Optional[int] n_obs: int pct_missing: float pct_outlier: float is_stationary: bool dominant_period: Optional[int]
[docs] def __repr__(self) -> str: # pragma: no cover lines = [ "ForecastabilityReport(", f" score : {self.score:.1f}/100", " sub_scores :", ] for k, v in self.sub_scores.items(): lines.append(f" {k:<20}: {v:.1f}") lines += [ f" recommended_model: {self.recommended_model}", f" recommended_diff : {self.recommended_diff}", f" recommended_period: {self.recommended_period}", f" n_obs : {self.n_obs}", f" pct_missing : {self.pct_missing:.2f} %", f" pct_outlier : {self.pct_outlier:.2f} %", f" is_stationary : {self.is_stationary}", ")", ] return "\n".join(lines)
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _pct_outlier_iqr(x: np.ndarray) -> float: """Return percentage of values outside 1.5×IQR fences (no NaN).""" if len(x) == 0: return 0.0 q1, q3 = float(np.percentile(x, 25)), float(np.percentile(x, 75)) iqr = q3 - q1 if iqr == 0: return 0.0 fence_lo = q1 - 1.5 * iqr fence_hi = q3 + 1.5 * iqr n_out = int(np.sum((x < fence_lo) | (x > fence_hi))) return n_out / len(x) * 100.0 def _stl_strengths( x: np.ndarray, period: int ) -> tuple[float, float]: """Return (strength_trend, strength_seasonal) via STL or fallback. Both values are in [0, 1]. Uses statsmodels STL when available; falls back to a Savitzky-Golay trend + per-phase seasonal average. """ n = len(x) if n < 2 * period: return 0.0, 0.0 try: from statsmodels.tsa.seasonal import STL result = STL(x, period=period, robust=True).fit() trend_comp = result.trend seasonal_comp = result.seasonal resid = result.resid except ImportError: from scipy.signal import savgol_filter wl = min(period * 2 + 1, n if n % 2 == 1 else n - 1) if wl < 3: wl = 3 wl = wl if wl % 2 == 1 else wl + 1 trend_comp = savgol_filter(x, window_length=min(wl, n if n % 2 == 1 else n - 1), polyorder=2) detrended = x - trend_comp seasonal_comp = np.zeros(n) for i in range(period): indices = np.arange(i, n, period) seasonal_comp[indices] = np.mean(detrended[indices]) resid = x - trend_comp - seasonal_comp var_resid = float(np.var(resid)) var_x = float(np.var(x)) strength_trend = max(0.0, 1.0 - var_resid / max(np.var(x - seasonal_comp), 1e-15)) strength_seasonal = max(0.0, 1.0 - var_resid / max(np.var(x - trend_comp), 1e-15)) return float(np.clip(strength_trend, 0.0, 1.0)), float(np.clip(strength_seasonal, 0.0, 1.0)) def _detect_period(ts: TimeSeries) -> Optional[int]: """Return the dominant seasonal period via FFT periodogram, or None.""" try: from tseda.seasonality.detector import SeasonalityDetector report = SeasonalityDetector().detect(ts) if report.is_seasonal and report.dominant_period is not None: return int(report.dominant_period) except Exception: pass return None def _has_large_gaps(ts: TimeSeries) -> bool: """Return True if any time gap exceeds 3× the median gap.""" if ts.n < 3: return False gaps_td = np.diff(ts.index.to_numpy()) # timedelta64[*] — unit-agnostic median_gap = np.median(gaps_td) if median_gap <= np.timedelta64(0): return False return bool(np.any(gaps_td > 3 * median_gap)) # --------------------------------------------------------------------------- # Scorer # ---------------------------------------------------------------------------
[docs] class ForecastabilityScorer: """Assess how forecastable a :class:`~tseda.core.TimeSeries` is. The scorer is **stateless** — calling :meth:`score` multiple times is safe. Methods ------- score(ts, period) Return a :class:`ForecastabilityReport` with an overall 0–100 score. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.forecastability.scorer import ForecastabilityScorer >>> rng = np.random.default_rng(1) >>> idx = pd.date_range("2020", periods=200, freq="D") >>> ts = TimeSeries(rng.standard_normal(200), index=idx) >>> r = ForecastabilityScorer().score(ts) >>> isinstance(r.score, float) True """
[docs] def score( self, ts: TimeSeries, *, period: Optional[int] = None, alpha: float = 0.05, ) -> ForecastabilityReport: """Compute the forecastability score for *ts*. Parameters ---------- ts : TimeSeries Input series. period : int, optional Seasonal period. When ``None`` the period is detected automatically via the FFT periodogram. alpha : float, optional Significance level used for stationarity and ACF tests. Default ``0.05``. Returns ------- ForecastabilityReport Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *period* is given and is < 2, or *ts* has fewer than 4 obs. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.forecastability.scorer import ForecastabilityScorer >>> rng = np.random.default_rng(2) >>> idx = pd.date_range("2020", periods=365, freq="D") >>> n = 365 >>> seas = np.sin(2 * np.pi * np.arange(n) / 7) * 3 >>> ts = TimeSeries(seas + rng.standard_normal(n) * 0.5, index=idx) >>> r = ForecastabilityScorer().score(ts, period=7) >>> r.recommended_period 7 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) x_full = ts.values n_full = len(x_full) if n_full < 4: raise ValueError( "ForecastabilityScorer requires at least 4 observations." ) if period is not None and period < 2: raise ValueError( f"'period' must be >= 2; got {period}." ) x_clean = x_full[~np.isnan(x_full)] n_clean = len(x_clean) # ── 1. Data quality ──────────────────────────────────────────── pct_nan = (n_full - n_clean) / n_full * 100.0 pct_out = _pct_outlier_iqr(x_clean) dq_score = float(np.clip(100.0 - pct_nan - pct_out, 0.0, 100.0)) # ── 2. Stationarity ──────────────────────────────────────────── tester = StationarityTester() try: adf_result = tester.adf(ts, alpha=alpha) adf_stat = adf_result.is_stationary except Exception: adf_stat = True # constant / zero-variance series treated as stationary try: kpss_result = tester.kpss(ts, alpha=alpha) kpss_stat = kpss_result.is_stationary except Exception: kpss_stat = True if adf_stat and kpss_stat: stat_score = 100.0 elif adf_stat and not kpss_stat: stat_score = 75.0 elif not adf_stat and kpss_stat: stat_score = 50.0 else: stat_score = 0.0 recommended_diff = 0 if adf_stat else 1 # ── 3. Signal-to-noise ───────────────────────────────────────── detected_period = period if period is not None else _detect_period(ts) effective_period = detected_period if detected_period is not None else 12 if n_clean >= 2 * effective_period: str_trend, str_seasonal = _stl_strengths(x_clean, effective_period) else: str_trend = float( np.clip(1.0 - np.var(np.diff(x_clean)) / max(np.var(x_clean), 1e-15), 0.0, 1.0) ) str_seasonal = 0.0 sn_score = float(np.clip((str_trend + str_seasonal) / 2.0 * 100.0, 0.0, 100.0)) # ── 4. Autocorrelation ──────────────────────────────────────── max_lag = min(40, n_clean // 2 - 1) if max_lag >= 1 and n_clean >= 4: acf_result = AutocorrelationAnalyzer().analyze(ts, lags=max_lag, alpha=alpha) acf_vals = acf_result.acf[1:] ci = float(acf_result.conf_upper[1]) sig_mask = np.abs(acf_vals) > ci if sig_mask.any(): max_acf = float(np.max(np.abs(acf_vals[sig_mask]))) else: max_acf = 0.0 else: max_acf = 0.0 ac_score = float(np.clip(max_acf * 100.0, 0.0, 100.0)) # ── 5. Sample size ──────────────────────────────────────────── ratio = n_clean / (2.0 * effective_period) ss_score = float(np.clip(ratio / 5.0 * 100.0, 0.0, 100.0)) # ── 6. Regularity ───────────────────────────────────────────── is_reg = ts.is_regular has_gaps = _has_large_gaps(ts) reg_score = 50.0 * float(is_reg) + 50.0 * float(not has_gaps) # ── Weighted overall score ──────────────────────────────────── sub_scores: Dict[str, float] = { "data_quality": round(dq_score, 2), "stationarity": round(stat_score, 2), "signal_to_noise": round(sn_score, 2), "autocorrelation": round(ac_score, 2), "sample_size": round(ss_score, 2), "regularity": round(reg_score, 2), } overall = sum(_WEIGHTS[k] * v for k, v in sub_scores.items()) overall = float(np.clip(overall, 0.0, 100.0)) # ── Recommended model ───────────────────────────────────────── is_seasonal = detected_period is not None recommended_model = _recommend_model( is_seasonal=is_seasonal, is_stationary=adf_stat, n_obs=n_clean, overall_score=overall, ) return ForecastabilityReport( score=round(overall, 2), sub_scores=sub_scores, recommended_model=recommended_model, recommended_diff=recommended_diff, recommended_period=detected_period, n_obs=n_full, pct_missing=round(pct_nan, 4), pct_outlier=round(pct_out, 4), is_stationary=adf_stat, dominant_period=detected_period, )
def _recommend_model( *, is_seasonal: bool, is_stationary: bool, n_obs: int, overall_score: float, ) -> str: """Select a recommended modelling approach from a fixed set. Decision rules (evaluated in order): 1. Very low score (< 25) → ``"ML"`` (non-linear, complex patterns) 2. Seasonal + stationary → ``"SARIMA"`` 3. Seasonal + non-stationary → ``"ETS"`` (handles trend/level updates) 4. Non-seasonal + stationary → ``"ARIMA"`` 5. Long non-seasonal series with trend → ``"Prophet"`` 6. Fallback → ``"ARIMA"`` """ if overall_score < 25.0: return "ML" if is_seasonal and is_stationary: return "SARIMA" if is_seasonal and not is_stationary: return "ETS" if not is_seasonal and is_stationary: return "ARIMA" if not is_seasonal and not is_stationary and n_obs >= 100: return "Prophet" return "ARIMA"