Source code for tseda.quality.outliers

"""
Outlier detection for time series.

Four detection methods are provided, all implemented with pure numpy / scipy —
no machine-learning dependencies:

+----------+----------------------------+-------------------------------+
| Method   | Statistic                  | Best for                      |
+==========+============================+===============================+
| IQR      | Tukey fences               | Symmetric or skewed data      |
+----------+----------------------------+-------------------------------+
| Z-score  | Mean / std deviation       | Approximately normal data     |
+----------+----------------------------+-------------------------------+
| MAD      | Median absolute deviation  | Skewed data, heavy tails      |
+----------+----------------------------+-------------------------------+
| GESD     | Generalized ESD test       | Known-normal; # unknown       |
+----------+----------------------------+-------------------------------+

All detectors return an :class:`OutlierReport` and expose ``.remove()`` /
``.clip()`` helpers that return cleaned :class:`~tseda.core.TimeSeries`
objects.

Classes
-------
OutlierReport
    Immutable result dataclass.
OutlierDetector
    Stateless detector.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.outliers import OutlierDetector

>>> idx = pd.date_range("2020-01-01", periods=10, freq="D")
>>> vals = np.array([1, 2, 2, 3, 100, 2, 3, 2, 1, 2], dtype=float)
>>> ts  = TimeSeries(vals, index=idx)
>>> det = OutlierDetector()
>>> r   = det.iqr(ts)
>>> r.n_outliers
1
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_positive_int

__all__ = ["OutlierReport", "OutlierDetector"]

# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class OutlierReport: """Immutable outlier detection result. Attributes ---------- mask : numpy.ndarray Boolean array of shape ``(n,)``; ``True`` where an outlier was found. indices : numpy.ndarray Integer positions (0-based) of detected outliers. timestamps : pandas.DatetimeIndex Timestamps of detected outliers. values : numpy.ndarray Observed values at outlier positions. method : str Name of the detection method used. n_outliers : int Number of outliers detected. lower_bound : float or None Lower fence / threshold (when applicable). upper_bound : float or None Upper fence / threshold (when applicable). """ mask: np.ndarray indices: np.ndarray timestamps: pd.DatetimeIndex values: np.ndarray method: str n_outliers: int lower_bound: Optional[float] upper_bound: Optional[float]
[docs] def __repr__(self) -> str: # pragma: no cover lb = f"{self.lower_bound:.4f}" if self.lower_bound is not None else "—" ub = f"{self.upper_bound:.4f}" if self.upper_bound is not None else "—" return ( f"OutlierReport(\n" f" method : {self.method}\n" f" n_outliers : {self.n_outliers}\n" f" lower_bound : {lb}\n" f" upper_bound : {ub}\n" f")" )
# --------------------------------------------------------------------------- # Private helper # --------------------------------------------------------------------------- def _build_report( ts: TimeSeries, mask: np.ndarray, method: str, lower: Optional[float] = None, upper: Optional[float] = None, ) -> OutlierReport: """Construct an :class:`OutlierReport` from a boolean mask.""" idx = np.where(mask)[0] return OutlierReport( mask=mask, indices=idx, timestamps=ts.index[idx], values=ts.values[idx], method=method, n_outliers=int(mask.sum()), lower_bound=lower, upper_bound=upper, ) # --------------------------------------------------------------------------- # Detector # ---------------------------------------------------------------------------
[docs] class OutlierDetector: """Detect, remove, or clip outliers in a :class:`~tseda.core.TimeSeries`. This class is **stateless** — create one instance and reuse across many series. Methods ------- iqr(ts, k=1.5) Tukey IQR fence method. zscore(ts, threshold=3.0) Standard Z-score method. mad(ts, threshold=3.5) Median Absolute Deviation method. gesd(ts, alpha=0.05, max_outliers=10) Generalized Extreme Studentized Deviate test. remove(ts, report) Replace outlier values with NaN. clip(ts, report) Clip outlier values to the fence bounds. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=6, freq="D") >>> ts = TimeSeries([2.0, 2.1, 1.9, 2.0, 50.0, 2.0], index=idx) >>> det = OutlierDetector() IQR detection: >>> r = det.iqr(ts) >>> r.n_outliers 1 >>> int(r.indices[0]) 4 Remove the outlier (replace with NaN): >>> cleaned = det.remove(ts, r) >>> cleaned.has_nan True """ # ------------------------------------------------------------------ # Validation helper # ------------------------------------------------------------------ @staticmethod def _validate(ts: object) -> TimeSeries: if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) ts = ts # type: ignore[assignment] assert isinstance(ts, TimeSeries) vals = ts.values finite = vals[~np.isnan(vals)] if finite.size < 4: raise ValueError( "Outlier detection requires at least 4 non-NaN observations." ) return ts # ------------------------------------------------------------------ # IQR method # ------------------------------------------------------------------
[docs] def iqr(self, ts: TimeSeries, k: float = 1.5) -> OutlierReport: """Detect outliers using Tukey's IQR fences. Points below ``Q1 - k * IQR`` or above ``Q3 + k * IQR`` are flagged. NaN values are excluded from the quartile computation but *not* flagged as outliers. Parameters ---------- ts : TimeSeries Input series. k : float, optional Fence multiplier. Common choices: * ``1.5`` — standard outlier fence (default). * ``3.0`` — extreme outlier fence. Returns ------- OutlierReport Raises ------ ValueError If *k* is not positive, or if fewer than 4 non-NaN values exist. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([1.0, 2.0, 2.0, 2.0, 100.0], index=idx) >>> OutlierDetector().iqr(ts).n_outliers 1 """ if k <= 0: raise ValueError(f"'k' must be positive, got {k}.") ts = self._validate(ts) vals = ts.values finite = vals[~np.isnan(vals)] q1, q3 = float(np.percentile(finite, 25)), float(np.percentile(finite, 75)) iqr_val = q3 - q1 lower = q1 - k * iqr_val upper = q3 + k * iqr_val mask = ~np.isnan(vals) & ((vals < lower) | (vals > upper)) return _build_report(ts, mask, f"IQR(k={k})", lower, upper)
# ------------------------------------------------------------------ # Z-score method # ------------------------------------------------------------------
[docs] def zscore( self, ts: TimeSeries, threshold: float = 3.0 ) -> OutlierReport: """Detect outliers using the standard Z-score. A value is flagged when ``|z| > threshold``, where ``z = (x - mean) / std``. Parameters ---------- ts : TimeSeries Input series. threshold : float, optional Z-score cut-off. Default ``3.0`` (≈ 0.3 % false-positive rate under normality). Returns ------- OutlierReport Raises ------ ValueError If *threshold* is not positive or if std == 0. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([0.0, 0.1, 0.0, -0.1, 10.0], index=idx) >>> OutlierDetector().zscore(ts).n_outliers 1 """ if threshold <= 0: raise ValueError(f"'threshold' must be positive, got {threshold}.") ts = self._validate(ts) vals = ts.values finite = vals[~np.isnan(vals)] mean = float(np.mean(finite)) std = float(np.std(finite, ddof=1)) if std == 0.0: raise ValueError( "Z-score requires non-constant data (std == 0)." ) z = (vals - mean) / std mask = ~np.isnan(vals) & (np.abs(z) > threshold) lower = mean - threshold * std upper = mean + threshold * std return _build_report(ts, mask, f"Z-score(t={threshold})", lower, upper)
# ------------------------------------------------------------------ # MAD method # ------------------------------------------------------------------
[docs] def mad( self, ts: TimeSeries, threshold: float = 3.5 ) -> OutlierReport: """Detect outliers using the Median Absolute Deviation (MAD). A value is flagged when the modified Z-score exceeds *threshold*: .. math:: M_i = \\frac{0.6745 \\,(x_i - \\tilde{x})}{\\text{MAD}} where :math:`\\tilde{x}` is the median and :math:`\\text{MAD} = \\text{median}(|x_i - \\tilde{x}|)`. This is more robust than the Z-score for skewed distributions or heavy-tailed noise. Parameters ---------- ts : TimeSeries Input series. threshold : float, optional Modified Z-score cut-off. Iglewicz & Hoaglin (1993) recommend ``3.5`` (default). Returns ------- OutlierReport Raises ------ ValueError If *threshold* is not positive or MAD == 0. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([2.0, 2.1, 1.9, 2.0, 50.0], index=idx) >>> OutlierDetector().mad(ts).n_outliers 1 """ if threshold <= 0: raise ValueError(f"'threshold' must be positive, got {threshold}.") ts = self._validate(ts) vals = ts.values finite = vals[~np.isnan(vals)] med = float(np.median(finite)) mad_val = float(np.median(np.abs(finite - med))) if mad_val == 0.0: raise ValueError( "MAD == 0: more than half the non-NaN values are identical. " "MAD-based detection cannot distinguish outliers." ) modified_z = 0.6745 * (vals - med) / mad_val mask = ~np.isnan(vals) & (np.abs(modified_z) > threshold) lower = med - (threshold / 0.6745) * mad_val upper = med + (threshold / 0.6745) * mad_val return _build_report(ts, mask, f"MAD(t={threshold})", lower, upper)
# ------------------------------------------------------------------ # GESD method # ------------------------------------------------------------------
[docs] def gesd( self, ts: TimeSeries, *, alpha: float = 0.05, max_outliers: int = 10, ) -> OutlierReport: """Detect outliers using the Generalized ESD (Extreme Studentized Deviate) test. The GESD test (Rosner, 1983) sequentially removes the most extreme value and tests whether it is a statistical outlier, up to *max_outliers* iterations. It assumes the underlying data are approximately normal *after* removing the outliers. Parameters ---------- ts : TimeSeries Input series. alpha : float, optional Significance level. Default ``0.05``. max_outliers : int, optional Upper bound on the number of outliers to test for. Default 10. Must be less than ``n // 2``. Returns ------- OutlierReport ``lower_bound`` and ``upper_bound`` are ``None`` (GESD uses per-iteration critical values, not fixed fences). Raises ------ ValueError If *alpha* is not in ``(0, 1)`` or *max_outliers* is invalid. ImportError If scipy is not installed (needed for the t-distribution CDF). Notes ----- The critical value at each step *i* (1-indexed) is: .. math:: \\lambda_i = \\frac{(n - i) \\, t_{p, n-i-1}} {\\sqrt{(n-i-1+t_{p,n-i-1}^2)(n-i+1)}} where :math:`p = 1 - \\alpha / (2(n - i + 1))` and :math:`t_{p, \\nu}` is the :math:`p`-quantile of the t-distribution with :math:`\\nu` degrees of freedom. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=50, freq="D") >>> vals = rng.standard_normal(50) >>> vals[10] = 15.0 # plant a spike >>> ts = TimeSeries(vals, index=idx) >>> r = OutlierDetector().gesd(ts) >>> 10 in r.indices True """ try: from scipy import stats as sp_stats except ImportError as exc: raise ImportError( "gesd() requires scipy. Install it with: pip install scipy" ) from exc if not (0 < alpha < 1): raise ValueError(f"'alpha' must be in (0, 1), got {alpha}.") ts = self._validate(ts) vals = ts.values.copy() n = ts.n max_outliers = validate_positive_int(max_outliers, name="max_outliers") if max_outliers >= n // 2: raise ValueError( f"'max_outliers' ({max_outliers}) must be less than n // 2 = {n // 2}." ) # Work on the subset without NaN finite_mask = ~np.isnan(vals) finite_vals = vals[finite_mask] n_finite = len(finite_vals) # Store (test_stat, critical_val, candidate_index_in_finite) per step removed_positions: list[int] = [] # positions in finite_vals test_stats: list[float] = [] crit_vals: list[float] = [] working = finite_vals.copy() working_positions = list(range(n_finite)) for i in range(1, max_outliers + 1): n_w = len(working) if n_w < 3: break mean_w = float(np.mean(working)) std_w = float(np.std(working, ddof=1)) if std_w == 0: break abs_dev = np.abs(working - mean_w) local_idx = int(np.argmax(abs_dev)) R_i = float(abs_dev[local_idx] / std_w) test_stats.append(R_i) # Critical value p = 1.0 - alpha / (2.0 * (n_finite - i + 1)) t_crit = float(sp_stats.t.ppf(p, df=n_finite - i - 1)) lam = ( (n_finite - i) * t_crit / np.sqrt( (n_finite - i - 1 + t_crit ** 2) * (n_finite - i + 1) ) ) crit_vals.append(lam) removed_positions.append(working_positions[local_idx]) working = np.delete(working, local_idx) working_positions.pop(local_idx) # Find the largest h such that R_h > lambda_h n_detected = 0 for h in range(len(test_stats) - 1, -1, -1): if test_stats[h] > crit_vals[h]: n_detected = h + 1 break # Map back to original (full) positions finite_indices = np.where(finite_mask)[0] detected_finite = set(removed_positions[:n_detected]) outlier_orig = { int(finite_indices[p]) for p in detected_finite } mask = np.zeros(ts.n, dtype=bool) for pos in outlier_orig: mask[pos] = True return _build_report(ts, mask, f"GESD(alpha={alpha})", None, None)
# ------------------------------------------------------------------ # Repair methods # ------------------------------------------------------------------
[docs] def remove(self, ts: TimeSeries, report: OutlierReport) -> TimeSeries: """Replace detected outlier values with NaN. Parameters ---------- ts : TimeSeries The original series. report : OutlierReport Result from one of the detection methods. Returns ------- TimeSeries A new series with outliers replaced by NaN. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([1.0, 2.0, 100.0, 2.0, 1.0], index=idx) >>> det = OutlierDetector() >>> cleaned = det.remove(ts, det.iqr(ts)) >>> cleaned.has_nan True """ if not isinstance(report, OutlierReport): raise TypeError( f"'report' must be an OutlierReport, got {type(report).__name__!r}." ) vals = ts.values vals[report.mask] = np.nan return TimeSeries( vals, index=ts.index, name=ts.name, freq=ts.freq, unit=ts.unit, description=ts.description, )
[docs] def clip(self, ts: TimeSeries, report: OutlierReport) -> TimeSeries: """Clip outlier values to the fence bounds of *report*. Parameters ---------- ts : TimeSeries The original series. report : OutlierReport Must have non-``None`` ``lower_bound`` and ``upper_bound`` (i.e., from ``iqr``, ``zscore``, or ``mad``). Returns ------- TimeSeries A new series with values clamped to ``[lower_bound, upper_bound]``. Raises ------ ValueError If *report* has no bounds (e.g., from ``gesd``). Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.outliers import OutlierDetector >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([1.0, 2.0, 100.0, 2.0, 1.0], index=idx) >>> det = OutlierDetector() >>> r = det.iqr(ts) >>> clipped = det.clip(ts, r) >>> float(clipped.values.max()) < 100.0 True """ if not isinstance(report, OutlierReport): raise TypeError( f"'report' must be an OutlierReport, got {type(report).__name__!r}." ) if report.lower_bound is None or report.upper_bound is None: raise ValueError( "clip() requires an OutlierReport with numeric bounds. " f"'{report.method}' does not produce bounds. " "Use iqr(), zscore(), or mad() instead." ) vals = np.clip(ts.values, report.lower_bound, report.upper_bound) return TimeSeries( vals, index=ts.index, name=ts.name, freq=ts.freq, unit=ts.unit, description=ts.description, )