Source code for tseda.changepoint.detector

"""
Changepoint (structural break) detection for time series.

A changepoint is a position in the series where the underlying data-generating
process shifts — typically a sudden change in mean level, variance, or trend
slope.

Three complementary methods are provided, all in pure numpy / scipy:

+-----------------------+----------------------------+-----------------------------+
| Method                | Detects                    | Best for                    |
+=======================+============================+=============================+
| ``cusum``             | Mean shift                 | Online-style detection;     |
|                       |                            | sensitive to gradual drift  |
+-----------------------+----------------------------+-----------------------------+
| ``binary_segmentation``| Mean shift (multiple)     | Batch; finds exact number  |
|                       |                            | of breaks automatically     |
+-----------------------+----------------------------+-----------------------------+
| ``variance_ratio``    | Variance shift             | Detecting regime changes in |
|                       |                            | volatility                  |
+-----------------------+----------------------------+-----------------------------+

All methods return a :class:`ChangepointReport` and share a repair helper
:meth:`ChangepointDetector.segment` that returns labelled segment indices.

Classes
-------
ChangepointReport
    Frozen dataclass returned by every detection method.
ChangepointDetector
    Stateless detector.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.changepoint.detector import ChangepointDetector

Series with a single level shift at position 200:

>>> rng   = np.random.default_rng(0)
>>> n     = 400
>>> idx   = pd.date_range("2020-01-01", periods=n, freq="D")
>>> vals  = np.concatenate([rng.standard_normal(200),
...                         rng.standard_normal(200) + 5.0])
>>> ts    = TimeSeries(vals, index=idx)
>>> det   = ChangepointDetector()
>>> r     = det.binary_segmentation(ts)
>>> len(r.changepoints) >= 1
True
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional

import numpy as np
import pandas as pd
from scipy import stats as sp_stats

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_positive_int

__all__ = ["ChangepointReport", "ChangepointDetector"]


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class ChangepointReport: """Immutable changepoint detection result. Attributes ---------- changepoints : list of int 0-based integer positions of detected changepoints, sorted ascending. A changepoint at position ``k`` means the break occurs *between* observations ``k-1`` and ``k``. timestamps : pandas.DatetimeIndex Timestamps corresponding to each changepoint position. n_changepoints : int Number of detected changepoints. scores : numpy.ndarray Continuous changepoint score in [0, 1] for each observation. Higher values indicate stronger evidence of a structural break at or near that position. method : str Name of the detection method. """ changepoints: List[int] timestamps: pd.DatetimeIndex n_changepoints: int scores: np.ndarray method: str
[docs] def segment_labels(self, n: int) -> np.ndarray: """Return a 0-indexed integer segment label for each of *n* observations. Segment 0 spans ``[0, changepoints[0])``, segment 1 spans ``[changepoints[0], changepoints[1])``, and so on. Parameters ---------- n : int Total number of observations. Returns ------- numpy.ndarray of int Shape ``(n,)``. Examples -------- >>> from tseda.changepoint.detector import ChangepointReport >>> import numpy as np, pandas as pd >>> r = ChangepointReport( ... changepoints=[3, 7], ... timestamps=pd.DatetimeIndex([]), ... n_changepoints=2, ... scores=np.zeros(10), ... method="test", ... ) >>> r.segment_labels(10).tolist() [0, 0, 0, 1, 1, 1, 1, 2, 2, 2] """ labels = np.zeros(n, dtype=int) for seg_idx, cp in enumerate(sorted(self.changepoints), start=1): labels[cp:] = seg_idx return labels
[docs] def __repr__(self) -> str: # pragma: no cover ts_str = ( [str(t.date()) for t in self.timestamps[:5]] if len(self.timestamps) > 0 else [] ) return ( f"ChangepointReport(\n" f" method : {self.method}\n" f" n_changepoints : {self.n_changepoints}\n" f" changepoints : {self.changepoints[:5]}{'...' if self.n_changepoints > 5 else ''}\n" f" timestamps : {ts_str}{'...' if self.n_changepoints > 5 else ''}\n" f")" )
# --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _build_report( ts: TimeSeries, changepoints: List[int], scores: np.ndarray, method: str, ) -> ChangepointReport: cps = sorted(changepoints) idx = ts.index[cps] if cps else pd.DatetimeIndex([]) return ChangepointReport( changepoints=cps, timestamps=idx, n_changepoints=len(cps), scores=scores, method=method, ) def _clean_array(ts: TimeSeries) -> np.ndarray: """Return non-NaN values as a numpy array (linear interp for NaN).""" vals = ts.values.astype(float) nan_mask = np.isnan(vals) if nan_mask.any(): idx_arr = np.arange(len(vals)) vals[nan_mask] = np.interp( idx_arr[nan_mask], idx_arr[~nan_mask], vals[~nan_mask] ) return vals def _cusum_arrays( x: np.ndarray, target: float, sigma: float, k: float, h: float ) -> tuple[np.ndarray, np.ndarray]: """Compute two-sided CUSUM arrays S+ and S−.""" n = len(x) S_pos = np.zeros(n) S_neg = np.zeros(n) for i in range(1, n): S_pos[i] = max(0.0, S_pos[i - 1] + (x[i] - target) - k * sigma) S_neg[i] = max(0.0, S_neg[i - 1] - (x[i] - target) - k * sigma) return S_pos, S_neg def _binary_seg_recursive( x: np.ndarray, offset: int, min_size: int, penalty: float ) -> List[int]: """Recursive binary segmentation — returns changepoints as global indices.""" n = len(x) if n < 2 * min_size: return [] # Sum of squared errors for the full segment mu_all = np.nanmean(x) sse_all = float(np.nansum((x - mu_all) ** 2)) best_gain = -np.inf best_split = -1 for t in range(min_size, n - min_size + 1): left, right = x[:t], x[t:] sse_l = float(np.nansum((left - np.nanmean(left)) ** 2)) sse_r = float(np.nansum((right - np.nanmean(right)) ** 2)) gain = sse_all - sse_l - sse_r if gain > best_gain: best_gain = gain best_split = t if best_gain <= penalty or best_split < 0: return [] # Found a split — recurse on both halves cps = [offset + best_split] cps += _binary_seg_recursive(x[:best_split], offset, min_size, penalty) cps += _binary_seg_recursive(x[best_split:], offset + best_split, min_size, penalty) return sorted(cps) # --------------------------------------------------------------------------- # Detector # ---------------------------------------------------------------------------
[docs] class ChangepointDetector: """Detect structural breaks in a :class:`~tseda.core.TimeSeries`. This class is **stateless** — one instance, many series. Methods ------- cusum(ts, threshold, drift, target) Two-sided CUSUM control chart for mean shift. binary_segmentation(ts, min_size, penalty) Recursive mean-shift changepoint detection. variance_ratio(ts, window, alpha) Sliding F-test for variance shifts. segment(ts, report) Return segment labels and per-segment statistics. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.changepoint.detector import ChangepointDetector Single level shift: >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=200, freq="D") >>> vals = np.concatenate([rng.standard_normal(100), ... rng.standard_normal(100) + 4.0]) >>> ts = TimeSeries(vals, index=idx) >>> det = ChangepointDetector() >>> r = det.binary_segmentation(ts) >>> abs(r.changepoints[0] - 100) <= 5 # within 5 obs of true break True """ @staticmethod def _validate(ts: object) -> TimeSeries: if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) ts = ts # type: ignore[assignment] assert isinstance(ts, TimeSeries) x = ts.values[~np.isnan(ts.values)] if len(x) < 10: raise ValueError( "Changepoint detection requires at least 10 non-NaN observations." ) return ts # ------------------------------------------------------------------ # CUSUM # ------------------------------------------------------------------
[docs] def cusum( self, ts: TimeSeries, *, threshold: float = 5.0, drift: float = 0.5, target: Optional[float] = None, ) -> ChangepointReport: """Two-sided CUSUM (Cumulative Sum) control chart for mean shift. CUSUM accumulates deviations from a *target* mean. When the cumulative sum exceeds a *threshold* (expressed in units of σ), a changepoint is signalled. Parameters ---------- ts : TimeSeries Input series. threshold : float, optional Decision interval in multiples of σ (default ``5.0``). Higher values = less sensitive / fewer false alarms. drift : float, optional Allowance parameter *k* (default ``0.5``). Typically set to half the magnitude of the smallest shift to detect, in units of σ. target : float, optional Reference (in-control) mean. Defaults to the series mean. Returns ------- ChangepointReport Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If fewer than 10 non-NaN observations or *threshold* / *drift* ≤ 0. Notes ----- The CUSUM chart for detecting upward shifts: .. math:: S_t^+ = \\max\\bigl(0,\\; S_{t-1}^+ + (x_t - \\mu_0) - k\\sigma\\bigr) A changepoint is signalled when :math:`S_t^+ > h\\sigma` (or similarly for :math:`S_t^-`). After each signal the accumulator is reset to zero. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.changepoint.detector import ChangepointDetector >>> rng = np.random.default_rng(1) >>> idx = pd.date_range("2020", periods=300, freq="D") >>> vals = np.concatenate([rng.standard_normal(150), ... rng.standard_normal(150) + 3.0]) >>> ts = TimeSeries(vals, index=idx) >>> r = ChangepointDetector().cusum(ts, threshold=5.0, drift=0.5) >>> r.n_changepoints >= 1 True """ if threshold <= 0: raise ValueError(f"'threshold' must be positive, got {threshold}.") if drift <= 0: raise ValueError(f"'drift' must be positive, got {drift}.") ts = self._validate(ts) x = _clean_array(ts) n = len(x) # Estimate in-control mean and sigma mu = float(target) if target is not None else float(np.mean(x)) diffs = np.diff(x) sigma = float(np.std(diffs, ddof=1) / np.sqrt(2)) if len(diffs) > 1 else 1.0 if sigma == 0: sigma = 1.0 h = threshold * sigma k = drift # allowance (no sigma scaling — drift is already a fraction) # Accumulate CUSUM incrementally with inline reset so every position # is evaluated correctly — including positions immediately after a signal. changepoints: List[int] = [] S_p = np.zeros(n) S_n = np.zeros(n) sp, sn = 0.0, 0.0 for i in range(1, n): sp = max(0.0, sp + (x[i] - mu) - k * sigma) sn = max(0.0, sn - (x[i] - mu) - k * sigma) S_p[i] = sp S_n[i] = sn if sp > h or sn > h: changepoints.append(i) sp, sn = 0.0, 0.0 # reset after each signal # Scores: normalised max(S+, S−) from the reset-adjusted arrays max_s = max(float(S_p.max()), float(S_n.max()), 1e-10) scores = np.maximum(S_p, S_n) / max_s return _build_report(ts, changepoints, scores, f"cusum(t={threshold},d={drift})")
# ------------------------------------------------------------------ # Binary segmentation # ------------------------------------------------------------------
[docs] def binary_segmentation( self, ts: TimeSeries, *, min_size: int = 10, penalty: Optional[float] = None, ) -> ChangepointReport: """Recursive binary segmentation for mean-shift changepoints. Iteratively finds the position that maximises the reduction in within-segment sum-of-squares error. A split is accepted when the gain exceeds *penalty*; recursion continues on each sub-segment. Parameters ---------- ts : TimeSeries Input series. min_size : int, optional Minimum number of observations per segment (default ``10``). Prevents detecting breaks on very small sub-sequences. penalty : float, optional Minimum SSE gain required to accept a split. Defaults to ``n × σ²`` where σ is estimated from first-differences. A higher penalty → fewer changepoints. Returns ------- ChangepointReport Notes ----- The algorithm has O(n²) time complexity per level of recursion. For very long series (n > 5000) consider using a larger *min_size* or restricting the search. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.changepoint.detector import ChangepointDetector >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=300, freq="D") >>> vals = np.concatenate([rng.standard_normal(100), ... rng.standard_normal(100) + 5.0, ... rng.standard_normal(100)]) >>> ts = TimeSeries(vals, index=idx) >>> r = ChangepointDetector().binary_segmentation(ts) >>> r.n_changepoints 2 """ ts = self._validate(ts) min_size = validate_positive_int(min_size, name="min_size") x = _clean_array(ts) n = len(x) # Default penalty: σ² × n (BIC-like) if penalty is None: diffs = np.diff(x) sigma2 = float(np.var(diffs, ddof=1) / 2) if len(diffs) > 1 else 1.0 penalty = sigma2 * n cps = _binary_seg_recursive(x, 0, min_size, float(penalty)) # Build per-position scores: proportion of total SSE gain at nearest cp scores = np.zeros(n) if cps: total_sse = float(np.sum((x - np.mean(x)) ** 2)) for cp in cps: left, right = x[:cp], x[cp:] gain = total_sse - ( float(np.sum((left - np.mean(left)) ** 2)) + float(np.sum((right - np.mean(right)) ** 2)) ) if total_sse > 0: scores[cp] = min(1.0, gain / total_sse) return _build_report(ts, cps, scores, f"binary_segmentation(p={penalty:.1f})")
# ------------------------------------------------------------------ # Variance ratio # ------------------------------------------------------------------
[docs] def variance_ratio( self, ts: TimeSeries, *, window: int = 30, alpha: float = 0.05, ) -> ChangepointReport: """Detect variance shifts via a sliding two-sample F-test. Two adjacent windows of width *window* are compared at each position. A significant difference in variance (p < alpha) signals a variance-change changepoint. Parameters ---------- ts : TimeSeries Input series. window : int, optional Half-window width for each sample. Default 30. alpha : float, optional Significance level for the F-test. Default 0.05. Returns ------- ChangepointReport ``changepoints`` are positions with a significant variance shift, with consecutive positives merged into the maximum-score position. Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *window* < 3 or *alpha* outside (0, 1). Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.changepoint.detector import ChangepointDetector >>> rng = np.random.default_rng(2) >>> idx = pd.date_range("2020", periods=200, freq="D") >>> vals = np.concatenate([rng.standard_normal(100) * 0.5, ... rng.standard_normal(100) * 3.0]) >>> ts = TimeSeries(vals, index=idx) >>> r = ChangepointDetector().variance_ratio(ts, window=20) >>> r.n_changepoints >= 1 True """ if not (0 < alpha < 1): raise ValueError(f"'alpha' must be in (0, 1), got {alpha}.") ts = self._validate(ts) window = validate_positive_int(window, name="window") if window < 3: raise ValueError(f"'window' must be >= 3, got {window}.") x = _clean_array(ts) n = len(x) scores = np.zeros(n) sig = np.zeros(n, dtype=bool) for t in range(window, n - window): left = x[t - window : t] right = x[t : t + window] nl, nr = len(left), len(right) var_l = float(np.var(left, ddof=1)) var_r = float(np.var(right, ddof=1)) if var_l <= 0 and var_r <= 0: continue if var_l <= 0: var_l = 1e-12 if var_r <= 0: var_r = 1e-12 # Two-sided F-test: df1 must match the group with the larger variance if var_l >= var_r: F, df1, df2 = var_l / var_r, nl - 1, nr - 1 else: F, df1, df2 = var_r / var_l, nr - 1, nl - 1 pval = float(2 * sp_stats.f.sf(F, df1, df2)) scores[t] = 1.0 - pval sig[t] = pval < alpha # Merge consecutive significant positions — keep local maximum cps: List[int] = [] in_run = False run_start = 0 run_best = 0 run_score = -1.0 for i in range(n): if sig[i]: if not in_run: in_run, run_start = True, i run_best, run_score = i, scores[i] else: if scores[i] > run_score: run_best, run_score = i, scores[i] else: if in_run: cps.append(run_best) in_run = False if in_run: cps.append(run_best) return _build_report(ts, cps, scores, f"variance_ratio(w={window},α={alpha})")
# ------------------------------------------------------------------ # Segment analysis # ------------------------------------------------------------------
[docs] def segment( self, ts: TimeSeries, report: ChangepointReport, ) -> pd.DataFrame: """Return per-segment statistics for a change-point report. Parameters ---------- ts : TimeSeries The original series. report : ChangepointReport Output of any detection method. Returns ------- pandas.DataFrame One row per segment with columns: ``segment``, ``start``, ``end``, ``n_obs``, ``mean``, ``std``, ``min``, ``max``. Raises ------ TypeError If either argument has the wrong type. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.changepoint.detector import ChangepointDetector >>> rng = np.random.default_rng(0) >>> idx = pd.date_range("2020", periods=200, freq="D") >>> vals = np.concatenate([rng.standard_normal(100), ... rng.standard_normal(100) + 4.0]) >>> ts = TimeSeries(vals, index=idx) >>> det = ChangepointDetector() >>> r = det.binary_segmentation(ts) >>> df = det.segment(ts, r) >>> len(df) == r.n_changepoints + 1 True """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) if not isinstance(report, ChangepointReport): raise TypeError( f"'report' must be a ChangepointReport, " f"got {type(report).__name__!r}." ) breaks = sorted(report.changepoints) bounds = [0] + breaks + [ts.n] rows = [] for seg_idx, (lo, hi) in enumerate(zip(bounds[:-1], bounds[1:])): seg_vals = ts.values[lo:hi] finite = seg_vals[~np.isnan(seg_vals)] rows.append({ "segment": seg_idx, "start": ts.index[lo], "end": ts.index[hi - 1], "n_obs": hi - lo, "mean": float(np.mean(finite)) if len(finite) else np.nan, "std": float(np.std(finite, ddof=1)) if len(finite) > 1 else np.nan, "min": float(np.min(finite)) if len(finite) else np.nan, "max": float(np.max(finite)) if len(finite) else np.nan, }) return pd.DataFrame(rows)