Source code for tseda.anomaly.detector

"""
Anomaly detection for time series.

Detects two classes of anomaly:

* **Point anomalies** — isolated observations that deviate sharply from
  the surrounding distribution (spikes, dips).
* **Contextual anomalies** — observations that are surprising given their
  local neighbourhood (e.g., a value that is normal globally but anomalous
  within a specific season).

Four detection methods are provided, all in pure numpy / scipy:

+------------------+------------------------------+---------------------------------+
| Method           | Mechanism                    | Best for                        |
+==================+==============================+=================================+
| ``rolling_iqr``  | Rolling IQR fence            | Non-stationary level data       |
+------------------+------------------------------+---------------------------------+
| ``rolling_z``    | Rolling mean ± k × std       | Approximately normal windows    |
+------------------+------------------------------+---------------------------------+
| ``stl_residual`` | STL decompose → flag residual| Seasonal + trend data           |
+------------------+------------------------------+---------------------------------+
| ``gesd``         | Global GESD test             | Stationary data, known # spikes |
+------------------+------------------------------+---------------------------------+

All methods return an :class:`AnomalyReport` and share the repair helpers
:meth:`AnomalyDetector.remove` (replace with NaN) and
:meth:`AnomalyDetector.label` (0/1 indicator series).

Classes
-------
AnomalyReport
    Frozen dataclass returned by every detection method.
AnomalyDetector
    Stateless detector.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.anomaly.detector import AnomalyDetector

Plant a spike and recover it:

>>> rng  = np.random.default_rng(0)
>>> idx  = pd.date_range("2020-01-01", periods=200, freq="D")
>>> vals = rng.standard_normal(200)
>>> vals[50] = 15.0   # spike
>>> ts   = TimeSeries(vals, index=idx)
>>> det  = AnomalyDetector()
>>> r    = det.rolling_iqr(ts)
>>> 50 in r.indices
True
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_positive_int

__all__ = ["AnomalyReport", "AnomalyDetector"]


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class AnomalyReport: """Immutable anomaly detection result. Attributes ---------- mask : numpy.ndarray Boolean array of shape ``(n,)``; ``True`` where an anomaly was detected. indices : numpy.ndarray Integer positions (0-based) of detected anomalies. timestamps : pandas.DatetimeIndex Timestamps of detected anomalies. values : numpy.ndarray Observed values at anomaly positions. scores : numpy.ndarray Continuous anomaly score in [0, 1] for each observation. Higher values indicate stronger evidence of anomaly. ``0`` for normal observations. method : str Name of the detection method. n_anomalies : int Number of anomalies detected. """ mask: np.ndarray indices: np.ndarray timestamps: pd.DatetimeIndex values: np.ndarray scores: np.ndarray method: str n_anomalies: int
[docs] def __repr__(self) -> str: # pragma: no cover return ( f"AnomalyReport(\n" f" method : {self.method}\n" f" n_anomalies : {self.n_anomalies}\n" f" top scores : {np.sort(self.scores)[::-1][:5].round(4).tolist()}\n" f")" )
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _build_report( ts: TimeSeries, mask: np.ndarray, scores: np.ndarray, method: str ) -> AnomalyReport: idx = np.where(mask)[0] return AnomalyReport( mask=mask, indices=idx, timestamps=ts.index[idx], values=ts.values[idx], scores=scores, method=method, n_anomalies=int(mask.sum()), ) def _safe_iqr(arr: np.ndarray) -> tuple[float, float, float, float]: """Return (q1, q3, iqr, median) ignoring NaN.""" finite = arr[~np.isnan(arr)] if len(finite) < 4: return np.nan, np.nan, np.nan, np.nan q1 = float(np.percentile(finite, 25)) q3 = float(np.percentile(finite, 75)) iqr = q3 - q1 med = float(np.median(finite)) return q1, q3, iqr, med # --------------------------------------------------------------------------- # Detector # ---------------------------------------------------------------------------
[docs] class AnomalyDetector: """Detect point and contextual anomalies in a :class:`~tseda.core.TimeSeries`. This class is **stateless** — one instance, many series. Methods ------- rolling_iqr(ts, window, k) Rolling Tukey IQR fence. rolling_z(ts, window, threshold) Rolling mean ± k × std. stl_residual(ts, period, method, threshold) STL decompose then flag large residuals. gesd(ts, alpha, max_outliers) Global Generalized ESD test (re-uses quality module). remove(ts, report) Replace anomaly positions with NaN. label(ts, report) Return a 0/1 :class:`~tseda.core.TimeSeries` of anomaly labels. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> rng = np.random.default_rng(1) >>> idx = pd.date_range("2020", periods=100, freq="D") >>> vals = rng.standard_normal(100) >>> vals[20] = 12.0 >>> ts = TimeSeries(vals, index=idx) >>> det = AnomalyDetector() >>> r = det.rolling_z(ts, window=30) >>> 20 in r.indices True """ @staticmethod def _validate(ts: object) -> TimeSeries: if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) ts = ts # type: ignore[assignment] assert isinstance(ts, TimeSeries) x = ts.values[~np.isnan(ts.values)] if len(x) < 8: raise ValueError( "Anomaly detection requires at least 8 non-NaN observations." ) return ts # ------------------------------------------------------------------ # Rolling IQR # ------------------------------------------------------------------
[docs] def rolling_iqr( self, ts: TimeSeries, window: int = 30, *, k: float = 2.5, center: bool = True, min_periods: Optional[int] = None, ) -> AnomalyReport: """Detect anomalies using a rolling IQR fence. An observation ``y[t]`` is flagged when it falls outside ``[Q1(t) − k×IQR(t), Q3(t) + k×IQR(t)]`` where the quartiles are computed over a rolling window centred at ``t``. Parameters ---------- ts : TimeSeries Input series. window : int, optional Rolling window width in observations. Default 30. k : float, optional Fence multiplier. Default 2.5 (tighter than the global 1.5 because the window is local and thus more sensitive). center : bool, optional Centre the window on each observation. Default ``True``. min_periods : int, optional Minimum non-NaN observations required in each window. Defaults to ``window // 2``. Returns ------- AnomalyReport Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If fewer than 8 non-NaN observations or *k* ≤ 0. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=100, freq="D") >>> vals = rng.standard_normal(100) >>> vals[40] = 10.0 >>> ts = TimeSeries(vals, index=idx) >>> r = AnomalyDetector().rolling_iqr(ts) >>> 40 in r.indices True """ if k <= 0: raise ValueError(f"'k' must be positive, got {k}.") ts = self._validate(ts) validate_positive_int(window, name="window") mp = min_periods if min_periods is not None else window // 2 s = ts.to_series() q1 = s.rolling(window, center=center, min_periods=mp).quantile(0.25) q3 = s.rolling(window, center=center, min_periods=mp).quantile(0.75) iqr = q3 - q1 lower = q1 - k * iqr upper = q3 + k * iqr vals = ts.values lo_arr = lower.to_numpy() hi_arr = upper.to_numpy() iqr_arr = hi_arr - lo_arr valid = ~(np.isnan(vals) | np.isnan(lo_arr) | np.isnan(hi_arr)) below = valid & (vals < lo_arr) above = valid & (vals > hi_arr) mask = below | above scores = np.zeros(ts.n, dtype=float) scores[below] = np.minimum(1.0, (lo_arr[below] - vals[below]) / (iqr_arr[below] + 1e-10)) scores[above] = np.minimum(1.0, (vals[above] - hi_arr[above]) / (iqr_arr[above] + 1e-10)) return _build_report(ts, mask, scores, f"rolling_iqr(w={window},k={k})")
# ------------------------------------------------------------------ # Rolling Z-score # ------------------------------------------------------------------
[docs] def rolling_z( self, ts: TimeSeries, window: int = 30, *, threshold: float = 3.0, center: bool = True, min_periods: Optional[int] = None, ) -> AnomalyReport: """Detect anomalies using a rolling Z-score. An observation is flagged when ``|y[t] − μ(t)| / σ(t) > threshold``, where ``μ`` and ``σ`` are the rolling mean and standard deviation computed over a window. Parameters ---------- ts : TimeSeries Input series. window : int, optional Rolling window width. Default 30. threshold : float, optional Z-score cut-off. Default 3.0. center : bool, optional Centre the window. Default ``True``. min_periods : int, optional Minimum non-NaN observations per window. Defaults to ``window // 2``. Returns ------- AnomalyReport Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> rng = np.random.default_rng(2) >>> idx = pd.date_range("2020", periods=100, freq="D") >>> vals = rng.standard_normal(100) >>> vals[60] = -9.0 >>> ts = TimeSeries(vals, index=idx) >>> r = AnomalyDetector().rolling_z(ts) >>> 60 in r.indices True """ if threshold <= 0: raise ValueError(f"'threshold' must be positive, got {threshold}.") ts = self._validate(ts) validate_positive_int(window, name="window") mp = min_periods if min_periods is not None else window // 2 s = ts.to_series() mu = s.rolling(window, center=center, min_periods=mp).mean() sigma = s.rolling(window, center=center, min_periods=mp).std(ddof=1) vals = ts.values mu_arr = mu.to_numpy() sg_arr = sigma.to_numpy() valid = ~(np.isnan(vals) | np.isnan(mu_arr) | np.isnan(sg_arr) | (sg_arr == 0)) z = np.where(valid, np.abs(vals - mu_arr) / (sg_arr + 1e-300), 0.0) mask = valid & (z > threshold) scores = np.where(mask, np.minimum(1.0, (z - threshold) / (threshold + 1e-10)), 0.0) return _build_report(ts, mask, scores, f"rolling_z(w={window},t={threshold})")
# ------------------------------------------------------------------ # STL residual # ------------------------------------------------------------------
[docs] def stl_residual( self, ts: TimeSeries, period: Optional[int] = None, *, residual_method: str = "iqr", k: float = 3.0, robust: bool = True, ) -> AnomalyReport: """Detect anomalies in the STL residual component. Decomposes *ts* into trend + seasonal + residual using STL, then flags residual values that are extreme under the chosen criterion. Parameters ---------- ts : TimeSeries Input series. Must be long enough for STL decomposition (at least 2 × *period* observations). period : int, optional Seasonal period. Inferred from ``ts.freq`` when omitted. residual_method : str, optional Criterion to flag residuals: * ``"iqr"`` — Tukey IQR fence with multiplier *k* (default). * ``"mad"`` — Median Absolute Deviation threshold *k*. * ``"z"`` — Z-score threshold *k*. k : float, optional Threshold multiplier / fence factor. Default 3.0. robust : bool, optional Use robust LOESS in STL. Default ``True``. Returns ------- AnomalyReport Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *residual_method* is not recognised. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> rng = np.random.default_rng(3) >>> n = 60 >>> seas = np.tile(np.sin(2*np.pi*np.arange(12)/12)*5, 5) >>> vals = seas + rng.standard_normal(n) * 0.3 >>> vals[25] = 20.0 >>> idx = pd.date_range("2018-01", periods=n, freq="MS") >>> ts = TimeSeries(vals, index=idx) >>> r = AnomalyDetector().stl_residual(ts, period=12) >>> 25 in r.indices True """ if residual_method not in ("iqr", "mad", "z"): raise ValueError( f"'residual_method' must be 'iqr', 'mad', or 'z'; " f"got {residual_method!r}." ) if k <= 0: raise ValueError(f"'k' must be positive, got {k}.") ts = self._validate(ts) # Decompose from tseda.decomposition.stl import STLDecomposer dec = STLDecomposer().decompose(ts, period=period, robust=robust) R = dec.residual.values # Flag residuals finite = R[~np.isnan(R)] mask = np.zeros(ts.n, dtype=bool) scores = np.zeros(ts.n, dtype=float) if residual_method == "iqr": q1, q3 = float(np.percentile(finite, 25)), float(np.percentile(finite, 75)) iqr_val = q3 - q1 lo, hi = q1 - k * iqr_val, q3 + k * iqr_val for i, r in enumerate(R): if np.isnan(r): continue if r < lo: mask[i] = True scores[i] = min(1.0, (lo - r) / (iqr_val + 1e-10)) elif r > hi: mask[i] = True scores[i] = min(1.0, (r - hi) / (iqr_val + 1e-10)) elif residual_method == "mad": med = float(np.median(finite)) mad_val = float(np.median(np.abs(finite - med))) if mad_val == 0: mad_val = 1e-10 for i, r in enumerate(R): if np.isnan(r): continue mod_z = 0.6745 * abs(r - med) / mad_val if mod_z > k: mask[i] = True scores[i] = min(1.0, (mod_z - k) / (k + 1e-10)) else: # z mean = float(np.mean(finite)) std = float(np.std(finite, ddof=1)) if std == 0: std = 1e-10 for i, r in enumerate(R): if np.isnan(r): continue z = abs(r - mean) / std if z > k: mask[i] = True scores[i] = min(1.0, (z - k) / (k + 1e-10)) return _build_report( ts, mask, scores, f"stl_residual({residual_method},k={k},p={dec.period})", )
# ------------------------------------------------------------------ # GESD (re-uses quality module) # ------------------------------------------------------------------
[docs] def gesd( self, ts: TimeSeries, *, alpha: float = 0.05, max_outliers: int = 10, ) -> AnomalyReport: """Global GESD anomaly detection. Delegates to :meth:`tseda.quality.OutlierDetector.gesd` and wraps the result as an :class:`AnomalyReport`. Best suited for stationary series where the number of anomalies is small relative to *n*. Parameters ---------- ts : TimeSeries Input series. alpha : float, optional Significance level. Default 0.05. max_outliers : int, optional Maximum number of anomalies to test for. Default 10. Returns ------- AnomalyReport Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=50, freq="D") >>> vals = rng.standard_normal(50) >>> vals[10] = 12.0 >>> ts = TimeSeries(vals, index=idx) >>> r = AnomalyDetector().gesd(ts) >>> 10 in r.indices True """ ts = self._validate(ts) from tseda.quality.outliers import OutlierDetector n_finite = int(np.sum(~np.isnan(ts.values))) safe_max = min(max_outliers, n_finite // 2 - 1) if safe_max < 1: return _build_report(ts, np.zeros(ts.n, dtype=bool), np.zeros(ts.n), f"gesd(alpha={alpha})") od_report = OutlierDetector().gesd(ts, alpha=alpha, max_outliers=safe_max) # Build continuous scores: |z-score| normalised to [0,1] vals = ts.values.copy() finite = vals[~np.isnan(vals)] mean_v = float(np.mean(finite)) std_v = float(np.std(finite, ddof=1)) if len(finite) > 1 else 1.0 z_abs = np.where( ~np.isnan(vals), np.abs((vals - mean_v) / (std_v + 1e-10)), 0.0, ) # Normalise scores to [0, 1] relative to max z max_z = z_abs.max() scores = (z_abs / max_z) if max_z > 0 else z_abs # Zero out non-anomaly scores scores[~od_report.mask] = 0.0 return _build_report( ts, od_report.mask, scores, f"gesd(alpha={alpha})", )
# ------------------------------------------------------------------ # Repair helpers # ------------------------------------------------------------------
[docs] def remove(self, ts: TimeSeries, report: AnomalyReport) -> TimeSeries: """Replace detected anomaly values with NaN. Parameters ---------- ts : TimeSeries The original series. report : AnomalyReport Output of any detection method. Returns ------- TimeSeries A new series with anomaly positions set to NaN. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> idx = pd.date_range("2020", periods=50, freq="D") >>> vals = np.zeros(50) >>> vals[5] = 100.0 >>> ts = TimeSeries(vals, index=idx) >>> det = AnomalyDetector() >>> cleaned = det.remove(ts, det.rolling_iqr(ts)) >>> cleaned.has_nan True """ if not isinstance(report, AnomalyReport): raise TypeError( f"'report' must be an AnomalyReport, got {type(report).__name__!r}." ) vals = ts.values vals[report.mask] = np.nan return TimeSeries( vals, index=ts.index, name=ts.name, freq=ts.freq, unit=ts.unit, description=ts.description, )
[docs] def label(self, ts: TimeSeries, report: AnomalyReport) -> TimeSeries: """Return a 0/1 :class:`~tseda.core.TimeSeries` of anomaly labels. Parameters ---------- ts : TimeSeries The original series (used for index and metadata). report : AnomalyReport Output of any detection method. Returns ------- TimeSeries Values are ``1`` at anomaly positions, ``0`` elsewhere. Name is ``"{ts.name}_anomaly_label"``. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.anomaly.detector import AnomalyDetector >>> idx = pd.date_range("2020", periods=50, freq="D") >>> vals = np.zeros(50); vals[5] = 100.0 >>> ts = TimeSeries(vals, index=idx) >>> det = AnomalyDetector() >>> lbl = det.label(ts, det.rolling_iqr(ts)) >>> lbl.values[5] 1.0 >>> lbl.values[0] 0.0 """ if not isinstance(report, AnomalyReport): raise TypeError( f"'report' must be an AnomalyReport, got {type(report).__name__!r}." ) labels = report.mask.astype(float) return TimeSeries( labels, index=ts.index, name=f"{ts.name}_anomaly_label", freq=ts.freq, )