Source code for tseda.features.statistical

"""
Statistical feature extraction for time series.

Extracts a rich set of statistical descriptors that characterise the
distribution, complexity, and structure of a :class:`~tseda.core.TimeSeries`.
All features are computed in pure numpy.

Feature groups
--------------
* **Distribution** — mean, std, skewness, kurtosis, quantiles, range, CV.
* **Spread / Robust** — MAD, trimmed mean, IQR.
* **Complexity** — approximate entropy, sample entropy, turning points ratio,
  mean-crossing rate.
* **Linear structure** — lag-1 autocorrelation, linear-trend slope and R².
* **Nonlinearity** — number of peaks and troughs, flatness ratio.

Classes
-------
StatisticalFeatureExtractor
    Stateless extractor.

Examples
--------
>>> import pandas as pd, numpy as np
>>> from tseda import TimeSeries
>>> from tseda.features.statistical import StatisticalFeatureExtractor

>>> rng = np.random.default_rng(0)
>>> idx = pd.date_range("2020", periods=200, freq="D")
>>> ts  = TimeSeries(rng.standard_normal(200), index=idx)
>>> df  = StatisticalFeatureExtractor().extract(ts)
>>> "mean" in df.columns and "std" in df.columns
True
>>> df.shape[0]
1
"""
from __future__ import annotations

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries

__all__ = ["StatisticalFeatureExtractor"]


# ---------------------------------------------------------------------------
# Private feature computers
# ---------------------------------------------------------------------------


def _approx_entropy(x: np.ndarray, m: int = 2, r: float = 0.2) -> float:
    """Approximate entropy (ApEn) — measures regularity / predictability.

    Parameters
    ----------
    x : numpy.ndarray  (1-D, no NaN)
    m : int            Template length (default 2).
    r : float          Tolerance as fraction of std (default 0.2).

    Returns
    -------
    float
        ApEn value.  Lower = more regular.
    """
    n   = len(x)
    tol = r * float(np.std(x, ddof=1))
    if tol == 0:
        return 0.0

    def _phi(m_: int) -> float:
        templates = np.array([x[i : i + m_] for i in range(n - m_ + 1)])
        count = np.array([
            np.sum(np.max(np.abs(templates - templates[i]), axis=1) <= tol)
            for i in range(len(templates))
        ])
        return float(np.mean(np.log(count / (n - m_ + 1))))

    return abs(_phi(m) - _phi(m + 1))


def _sample_entropy(x: np.ndarray, m: int = 2, r: float = 0.2) -> float:
    """Sample entropy (SampEn) — more robust ApEn variant.

    Lower = more regular; ``0`` for constant series.
    """
    n   = len(x)
    tol = r * float(np.std(x, ddof=1))
    if tol == 0:
        return 0.0

    def _count(m_: int) -> int:
        total = 0
        for i in range(n - m_):
            template = x[i : i + m_]
            for j in range(i + 1, n - m_):
                if np.max(np.abs(x[j : j + m_] - template)) < tol:
                    total += 1
        return total

    A = _count(m + 1)
    B = _count(m)
    if B == 0:
        return 0.0
    return float(-np.log(A / B)) if A > 0 else float("nan")


def _linear_trend(x: np.ndarray) -> tuple[float, float]:
    """Return (slope, r_squared) of a least-squares linear fit."""
    n = len(x)
    if n < 2:
        return 0.0, 0.0
    t    = np.arange(n, dtype=float)
    t_c  = t - t.mean()
    x_c  = x - x.mean()
    denom = float(np.dot(t_c, t_c))
    if denom == 0:
        return 0.0, 0.0
    slope  = float(np.dot(t_c, x_c) / denom)
    y_hat  = slope * t_c + x.mean()
    ss_res = float(np.sum((x - y_hat) ** 2))
    ss_tot = float(np.sum(x_c ** 2))
    r2     = 1.0 - ss_res / ss_tot if ss_tot > 0 else 0.0
    return slope, max(0.0, r2)


# ---------------------------------------------------------------------------
# Extractor
# ---------------------------------------------------------------------------


[docs] class StatisticalFeatureExtractor: """Extract statistical features from a :class:`~tseda.core.TimeSeries`. The extractor is **stateless**. It operates on the non-NaN values of the series. Methods ------- extract(ts, entropy) Return a single-row :class:`pandas.DataFrame` of features. Examples -------- >>> import pandas as pd, numpy as np >>> from tseda import TimeSeries >>> from tseda.features.statistical import StatisticalFeatureExtractor >>> idx = pd.date_range("2020", periods=100, freq="D") >>> ts = TimeSeries(np.arange(100.0), index=idx) >>> df = StatisticalFeatureExtractor().extract(ts) >>> round(float(df["linear_slope"].iloc[0]), 1) 1.0 >>> round(float(df["linear_r2"].iloc[0]), 2) 1.0 """
[docs] def extract( self, ts: TimeSeries, *, entropy: bool = True, ) -> pd.DataFrame: """Compute statistical features for *ts*. Parameters ---------- ts : TimeSeries Input series. entropy : bool, optional When ``True`` (default), compute approximate entropy and sample entropy. These are O(n²) — set ``False`` for large series (n > 2000) to save time. Returns ------- pandas.DataFrame One row, columns: Distribution: ``mean``, ``std``, ``var``, ``skewness``, ``kurtosis``, ``min``, ``max``, ``range``, ``median``, ``iqr``, ``mad``, ``cv``, ``trimmed_mean``, ``q25``, ``q75``, ``q05``, ``q95``. Complexity: ``turning_points_ratio``, ``mean_crossing_rate``, ``flatness_ratio``. If ``entropy=True``: ``approx_entropy``, ``sample_entropy``. Linear structure: ``lag1_acf``, ``linear_slope``, ``linear_r2``. Nonlinearity: ``n_peaks``, ``n_troughs``. Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *ts* has fewer than 4 non-NaN observations. Examples -------- >>> import pandas as pd, numpy as np >>> from tseda import TimeSeries >>> from tseda.features.statistical import StatisticalFeatureExtractor >>> idx = pd.date_range("2020", periods=5, freq="D") >>> ts = TimeSeries([1.0, 2.0, 1.5, 2.5, 2.0], index=idx) >>> df = StatisticalFeatureExtractor().extract(ts, entropy=False) >>> float(df["mean"].iloc[0]) 1.8 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) vals = ts.values x = vals[~np.isnan(vals)] n = len(x) if n < 4: raise ValueError( "StatisticalFeatureExtractor requires at least 4 non-NaN " f"observations; got {n}." ) # ── Distribution ────────────────────────────────────────────── mean_v = float(np.mean(x)) std_v = float(np.std(x, ddof=1)) var_v = std_v ** 2 med_v = float(np.median(x)) mn, mx = float(np.min(x)), float(np.max(x)) rng_v = mx - mn q25, q75 = float(np.percentile(x, 25)), float(np.percentile(x, 75)) q05, q95 = float(np.percentile(x, 5)), float(np.percentile(x, 95)) iqr_v = q75 - q25 mad_v = float(np.median(np.abs(x - med_v))) cv_v = std_v / abs(mean_v) if mean_v != 0 else float("nan") # Trimmed mean (5 % each tail) k = max(1, int(np.floor(n * 0.05))) xs = np.sort(x) tr_mean = float(np.mean(xs[k : n - k])) # Skewness and excess kurtosis (bias-corrected) m2 = float(np.mean((x - mean_v) ** 2)) m3 = float(np.mean((x - mean_v) ** 3)) m4 = float(np.mean((x - mean_v) ** 4)) if m2 > 0: g1 = m3 / m2 ** 1.5 skew = float(g1 * np.sqrt(n * (n - 1)) / (n - 2)) if n >= 3 else 0.0 g2 = m4 / m2 ** 2 - 3.0 kurt = float((n - 1) / ((n - 2) * (n - 3)) * ((n + 1) * g2 + 6)) if n >= 4 else 0.0 else: skew = 0.0 kurt = 0.0 # ── Complexity ──────────────────────────────────────────────── # Turning points (local max or min) diff1 = np.sign(np.diff(x)) sign_changes = np.where(diff1[:-1] != diff1[1:])[0] tp_ratio = float(len(sign_changes)) / max(n - 2, 1) # Mean-crossing rate xc = x - mean_v crossings = np.sum((xc[:-1] * xc[1:]) < 0) mc_rate = float(crossings) / max(n - 1, 1) # Flatness: fraction of consecutive pairs with zero difference flat = np.sum(np.diff(x) == 0) flat_ratio = float(flat) / max(n - 1, 1) # Entropy (optional, O(n²)) ap_ent = _approx_entropy(x) if entropy and n <= 2000 else float("nan") sa_ent = _sample_entropy(x) if entropy and n <= 500 else float("nan") # ── Linear structure ────────────────────────────────────────── lag1_acf = 0.0 if n >= 2: xc_arr = x - mean_v denom = float(np.dot(xc_arr, xc_arr)) if denom > 0: lag1_acf = float(np.dot(xc_arr[1:], xc_arr[:-1]) / denom) slope, r2 = _linear_trend(x) # ── Nonlinearity ────────────────────────────────────────────── d = np.diff(x) n_peaks = int(np.sum((d[:-1] > 0) & (d[1:] < 0))) n_trough = int(np.sum((d[:-1] < 0) & (d[1:] > 0))) row = { # Distribution "mean": mean_v, "std": std_v, "var": var_v, "skewness": skew, "kurtosis": kurt, "min": mn, "max": mx, "range": rng_v, "median": med_v, "iqr": iqr_v, "mad": mad_v, "cv": cv_v, "trimmed_mean": tr_mean, "q05": q05, "q25": q25, "q75": q75, "q95": q95, # Complexity "turning_points_ratio": tp_ratio, "mean_crossing_rate": mc_rate, "flatness_ratio": flat_ratio, "approx_entropy": ap_ent, "sample_entropy": sa_ent, # Linear structure "lag1_acf": lag1_acf, "linear_slope": slope, "linear_r2": r2, # Nonlinearity "n_peaks": float(n_peaks), "n_troughs": float(n_trough), } return pd.DataFrame([row], index=[ts.index[0]])