Source code for tseda.forecastability.leakage

"""
Leakage detection for time series feature sets.

Two classes of leakage are detected:

+--------------------+-------------------------------------------------------+
| Type               | Definition                                            |
+====================+=======================================================+
| Temporal leakage   | A feature at time *t* correlates more strongly with  |
|                    | future target values (t+1 … t+horizon) than with     |
|                    | past or present target values.                       |
+--------------------+-------------------------------------------------------+
| Target leakage     | A feature is so highly correlated with the target at |
|                    | lag 0 that it almost certainly encodes the target    |
|                    | itself (e.g., a lagged copy or a near-identity       |
|                    | transform).                                          |
+--------------------+-------------------------------------------------------+

When *features_df* is ``None`` the report is returned with empty leakage sets
and a warning that no features were provided.

Classes
-------
LeakageReport
    Frozen dataclass returned by :meth:`LeakageDetector.check`.
LeakageDetector
    Stateless detector.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.forecastability.leakage import LeakageDetector

No leakage — lagged features only:

>>> rng  = np.random.default_rng(0)
>>> n    = 100
>>> idx  = pd.date_range("2020", periods=n, freq="D")
>>> y    = rng.standard_normal(n)
>>> ts   = TimeSeries(y, index=idx)
>>> feat = pd.DataFrame({"lag1": np.roll(y, 1), "lag2": np.roll(y, 2)}, index=idx)
>>> feat.iloc[:2] = np.nan
>>> r    = LeakageDetector().check(ts, horizon=5, features_df=feat)
>>> r.has_target_leakage
False
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Optional

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries

__all__ = ["LeakageReport", "LeakageDetector"]


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class LeakageReport: """Immutable leakage detection result. Attributes ---------- has_temporal_leakage : bool ``True`` if any feature shows stronger correlation with future target than with current / past target. has_target_leakage : bool ``True`` if any feature is correlated with the target at lag 0 above *target_corr_threshold*. temporal_leakage_columns : list of str Names of feature columns flagged for temporal leakage. target_leakage_columns : list of str Names of feature columns flagged for target leakage. target_leakage_correlations : dict of str → float Lag-0 Pearson correlation for each column in :attr:`target_leakage_columns`. temporal_peak_lags : dict of str → int For each feature column, the lag at which the cross-correlation with the target is maximised. Positive lag means feature correlates with *future* target. horizon : int Forecast horizon passed to :meth:`~LeakageDetector.check`. n_features : int Number of feature columns examined. n_obs : int Number of observations in the target series. warnings : list of str Human-readable diagnostic messages. """ has_temporal_leakage: bool has_target_leakage: bool temporal_leakage_columns: List[str] target_leakage_columns: List[str] target_leakage_correlations: Dict[str, float] temporal_peak_lags: Dict[str, int] horizon: int n_features: int n_obs: int warnings: List[str]
[docs] def __repr__(self) -> str: # pragma: no cover return ( f"LeakageReport(\n" f" has_temporal_leakage : {self.has_temporal_leakage}\n" f" has_target_leakage : {self.has_target_leakage}\n" f" temporal_columns : {self.temporal_leakage_columns}\n" f" target_columns : {self.target_leakage_columns}\n" f" n_features : {self.n_features}\n" f" horizon : {self.horizon}\n" f")" )
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _cross_corr_at_lag(x: np.ndarray, y: np.ndarray, lag: int) -> float: """Pearson correlation of x[:-lag] vs y[lag:] (positive lag = y is ahead). Returns 0.0 if either slice has zero variance or is too short. """ if lag == 0: a, b = x, y elif lag > 0: a, b = x[:-lag], y[lag:] else: k = -lag a, b = x[k:], y[:-k] if len(a) < 3: return 0.0 valid = ~(np.isnan(a) | np.isnan(b)) a, b = a[valid], b[valid] if len(a) < 3: return 0.0 std_a, std_b = float(np.std(a)), float(np.std(b)) if std_a < 1e-12 or std_b < 1e-12: return 0.0 return float(np.corrcoef(a, b)[0, 1]) # --------------------------------------------------------------------------- # Detector # ---------------------------------------------------------------------------
[docs] class LeakageDetector: """Detect temporal and target leakage in a feature set. The detector is **stateless**. Methods ------- check(ts, horizon, features_df, target_corr_threshold) Return a :class:`LeakageReport`. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.forecastability.leakage import LeakageDetector Target leakage — a feature that *is* the target: >>> rng = np.random.default_rng(0) >>> n = 80 >>> idx = pd.date_range("2020", periods=n, freq="D") >>> y = rng.standard_normal(n) >>> ts = TimeSeries(y, index=idx) >>> feat = pd.DataFrame({"target_copy": y}, index=idx) >>> r = LeakageDetector().check(ts, horizon=1, features_df=feat) >>> r.has_target_leakage True >>> "target_copy" in r.target_leakage_columns True """
[docs] def check( self, ts: TimeSeries, horizon: int, *, features_df: Optional[pd.DataFrame] = None, target_corr_threshold: float = 0.95, ) -> LeakageReport: """Check *features_df* for leakage against target *ts*. Parameters ---------- ts : TimeSeries Target time series. horizon : int Forecast horizon in time steps. Must be >= 1. features_df : pandas.DataFrame, optional Feature matrix with the same :class:`~pandas.DatetimeIndex` as *ts*, one column per feature. When ``None`` the report is empty with a warning. target_corr_threshold : float, optional Pearson r threshold above which a feature is flagged as target-leaking. Default ``0.95``. Returns ------- LeakageReport Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *horizon* < 1, *target_corr_threshold* ∉ (0, 1], or *features_df* has a different number of rows from *ts*. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.forecastability.leakage import LeakageDetector >>> rng = np.random.default_rng(1) >>> n = 60 >>> idx = pd.date_range("2020", periods=n, freq="D") >>> ts = TimeSeries(rng.standard_normal(n), index=idx) >>> r = LeakageDetector().check(ts, horizon=3) >>> r.n_features 0 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) if not isinstance(horizon, int) or horizon < 1: raise ValueError( f"'horizon' must be a positive integer; got {horizon!r}." ) if not (0 < target_corr_threshold <= 1.0): raise ValueError( f"'target_corr_threshold' must be in (0, 1]; " f"got {target_corr_threshold!r}." ) warn_msgs: List[str] = [] if features_df is None: warn_msgs.append( "No features_df provided — leakage check skipped. " "Pass a DataFrame of feature columns to enable full analysis." ) return LeakageReport( has_temporal_leakage=False, has_target_leakage=False, temporal_leakage_columns=[], target_leakage_columns=[], target_leakage_correlations={}, temporal_peak_lags={}, horizon=horizon, n_features=0, n_obs=ts.n, warnings=warn_msgs, ) if not isinstance(features_df, pd.DataFrame): raise TypeError( f"'features_df' must be a pandas.DataFrame, " f"got {type(features_df).__name__!r}." ) if len(features_df) != ts.n: raise ValueError( f"'features_df' must have the same number of rows as 'ts' " f"({ts.n}); got {len(features_df)} rows." ) y = ts.values.copy() n_features = len(features_df.columns) temporal_leakage_cols: List[str] = [] target_leakage_cols: List[str] = [] target_leakage_corrs: Dict[str, float] = {} temporal_peak_lags: Dict[str, int] = {} for col in features_df.columns: f = features_df[col].to_numpy(dtype=float, na_value=np.nan) # ── Target leakage: |corr at lag 0| > threshold ────────── r0 = _cross_corr_at_lag(f, y, lag=0) if abs(r0) >= target_corr_threshold: target_leakage_cols.append(str(col)) target_leakage_corrs[str(col)] = round(r0, 6) # ── Temporal leakage: peak cross-correlation at positive lag ── max_shift = min(horizon, ts.n // 4) if max_shift < 1: temporal_peak_lags[str(col)] = 0 continue lags = range(-max_shift, max_shift + 1) corr_by_lag = {k: _cross_corr_at_lag(f, y, lag=k) for k in lags} peak_lag = max(corr_by_lag, key=lambda k: abs(corr_by_lag[k])) temporal_peak_lags[str(col)] = int(peak_lag) if peak_lag > 0: corr_future = abs(corr_by_lag[peak_lag]) corr_present = abs(corr_by_lag[0]) if corr_future > corr_present + 0.05: temporal_leakage_cols.append(str(col)) return LeakageReport( has_temporal_leakage=len(temporal_leakage_cols) > 0, has_target_leakage=len(target_leakage_cols) > 0, temporal_leakage_columns=temporal_leakage_cols, target_leakage_columns=target_leakage_cols, target_leakage_correlations=target_leakage_corrs, temporal_peak_lags=temporal_peak_lags, horizon=horizon, n_features=n_features, n_obs=ts.n, warnings=warn_msgs, )