Source code for tseda.quality.missing

"""
Missing-value analysis for time series.

Two distinct concepts are handled here:

* **Value NaN** — a timestamp is present in the index but its observed value
  is :data:`numpy.nan`.
* **Index gap** — a timestamp that *should* exist (given the series frequency)
  is absent from the index entirely.

Both are reported by :class:`MissingValueAnalyzer`.  Interpolation of NaN
values is also provided via :meth:`MissingValueAnalyzer.interpolate`.

Classes
-------
MissingValueReport
    Immutable result dataclass returned by :meth:`MissingValueAnalyzer.analyze`.
MissingValueAnalyzer
    Stateless analyzer; all methods accept a :class:`~tseda.core.TimeSeries`
    and return plain Python / numpy objects or a new :class:`~tseda.core.TimeSeries`.

Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.missing import MissingValueAnalyzer

>>> idx = pd.date_range("2020-01-01", periods=10, freq="D")
>>> vals = np.array([1.0, np.nan, 3.0, np.nan, np.nan, 6.0, 7.0, 8.0, np.nan, 10.0])
>>> ts  = TimeSeries(vals, index=idx)
>>> ana = MissingValueAnalyzer()
>>> report = ana.analyze(ts)
>>> report.n_nan
3
>>> report.pct_nan
30.0
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import numpy as np
import pandas as pd

from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_freq_string

__all__ = ["MissingValueReport", "MissingValueAnalyzer"]

# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class MissingValueReport: """Immutable summary of missing values in a :class:`~tseda.core.TimeSeries`. Attributes ---------- n_nan : int Number of NaN values in the observed array. pct_nan : float Percentage of NaN observations (0–100). n_gaps : int Number of missing *timestamps* (index gaps) when the series frequency is known. ``-1`` when frequency is unknown. gap_locations : list of pandas.Timestamp Start timestamp of each index gap. Empty when ``n_gaps <= 0``. longest_nan_run : int Length of the longest consecutive run of NaN values. nan_run_lengths : list of int Lengths of every consecutive NaN run (ascending order). nan_positions : numpy.ndarray Integer positions (0-based) of all NaN values. is_monotone_missing : bool ``True`` when all NaN values cluster at the start or end of the series (monotone missing pattern — easier to handle). """ n_nan: int pct_nan: float n_gaps: int gap_locations: List[pd.Timestamp] longest_nan_run: int nan_run_lengths: List[int] nan_positions: np.ndarray is_monotone_missing: bool
[docs] def __repr__(self) -> str: # pragma: no cover gap_str = ( f"{self.n_gaps} gap(s)" if self.n_gaps >= 0 else "unknown (no freq)" ) return ( f"MissingValueReport(\n" f" n_nan : {self.n_nan} ({self.pct_nan:.1f}%)\n" f" index gaps : {gap_str}\n" f" longest NaN run : {self.longest_nan_run}\n" f" is_monotone : {self.is_monotone_missing}\n" f")" )
# --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _nan_runs(mask: np.ndarray) -> List[int]: """Return a sorted list of consecutive-NaN run lengths.""" runs: List[int] = [] run = 0 for v in mask: if v: run += 1 elif run: runs.append(run) run = 0 if run: runs.append(run) return sorted(runs) def _index_gaps( index: pd.DatetimeIndex, freq: str ) -> Tuple[int, List[pd.Timestamp]]: """Count and locate timestamps missing from *index* given *freq*. Parameters ---------- index: The actual datetime index of the series. freq: Pandas offset alias (e.g., ``"D"``). Returns ------- n_gaps : int gap_locations : list of pd.Timestamp The first missing timestamp for each gap. """ expected = pd.date_range(start=index[0], end=index[-1], freq=freq) actual_set = set(index) missing = [ts for ts in expected if ts not in actual_set] return len(missing), missing # --------------------------------------------------------------------------- # Analyzer # ---------------------------------------------------------------------------
[docs] class MissingValueAnalyzer: """Analyze and repair missing values in a :class:`~tseda.core.TimeSeries`. This class is **stateless** — instantiate once and call its methods on different series objects. Methods ------- analyze(ts) Return a :class:`MissingValueReport` for *ts*. interpolate(ts, method) Fill NaN values and return a new :class:`~tseda.core.TimeSeries`. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.missing import MissingValueAnalyzer >>> idx = pd.date_range("2020-01-01", periods=5, freq="D") >>> vals = np.array([1.0, np.nan, 3.0, np.nan, 5.0]) >>> ts = TimeSeries(vals, index=idx) >>> ana = MissingValueAnalyzer() >>> r = ana.analyze(ts) >>> r.n_nan 2 >>> filled = ana.interpolate(ts) >>> filled.has_nan False """ # ------------------------------------------------------------------ # Public methods # ------------------------------------------------------------------
[docs] def analyze(self, ts: TimeSeries) -> MissingValueReport: """Compute a complete missing-value summary for *ts*. Parameters ---------- ts : TimeSeries The series to analyze. Returns ------- MissingValueReport Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.missing import MissingValueAnalyzer >>> idx = pd.date_range("2020", periods=4, freq="D") >>> vals = np.array([1.0, np.nan, np.nan, 4.0]) >>> report = MissingValueAnalyzer().analyze(TimeSeries(vals, index=idx)) >>> report.n_nan 2 >>> report.longest_nan_run 2 """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) values = ts.values nan_mask = np.isnan(values) n_nan = int(nan_mask.sum()) pct_nan = 100.0 * n_nan / max(ts.n, 1) nan_positions = np.where(nan_mask)[0] # Consecutive NaN runs runs = _nan_runs(nan_mask) longest = runs[-1] if runs else 0 # Index gaps (only when freq is known) if ts.freq is not None: try: n_gaps, gap_locs = _index_gaps(ts.index, ts.freq) except Exception: n_gaps, gap_locs = -1, [] else: n_gaps, gap_locs = -1, [] # Monotone missing: all NaN are at the head or tail is_monotone = False if n_nan > 0: first_nan = int(nan_positions[0]) last_nan = int(nan_positions[-1]) is_monotone = (first_nan == 0) or (last_nan == ts.n - 1) return MissingValueReport( n_nan=n_nan, pct_nan=round(pct_nan, 4), n_gaps=n_gaps, gap_locations=gap_locs, longest_nan_run=longest, nan_run_lengths=runs, nan_positions=nan_positions, is_monotone_missing=is_monotone, )
[docs] def interpolate( self, ts: TimeSeries, method: str = "linear", *, limit: Optional[int] = None, fill_value: Optional[float] = None, ) -> TimeSeries: """Fill NaN values and return a new :class:`~tseda.core.TimeSeries`. Parameters ---------- ts : TimeSeries Series to fill. method : str, optional Interpolation strategy. One of: * ``"linear"`` — linear interpolation between neighbours (default). Leading and trailing NaN are filled with the nearest observed boundary value when *limit* is ``None``. * ``"forward"`` — forward-fill (carry last observed value). * ``"backward"`` — backward-fill (carry next observed value). * ``"nearest"`` — fill with the nearest non-NaN value. * ``"zero"`` — fill with 0.0. * ``"constant"`` — fill with *fill_value* (must be provided). * ``"spline"`` — cubic spline (requires scipy). limit : int, optional Maximum number of consecutive NaN values to fill. ``None`` fills all gaps. fill_value : float, optional Used only with ``method="constant"``. Returns ------- TimeSeries A new series with NaN values replaced. Metadata (name, unit, freq, description) is preserved. Raises ------ TypeError If *ts* is not a :class:`~tseda.core.TimeSeries`. ValueError If *method* is not recognised, or if ``"constant"`` is chosen without supplying *fill_value*. Examples -------- >>> import numpy as np, pandas as pd >>> from tseda import TimeSeries >>> from tseda.quality.missing import MissingValueAnalyzer >>> idx = pd.date_range("2020", periods=5, freq="D") >>> vals = np.array([1.0, np.nan, np.nan, 4.0, 5.0]) >>> ts = TimeSeries(vals, index=idx) >>> ana = MissingValueAnalyzer() Linear interpolation: >>> filled = ana.interpolate(ts, "linear") >>> filled.values.tolist() [1.0, 2.0, 3.0, 4.0, 5.0] Forward fill: >>> fwd = ana.interpolate(ts, "forward") >>> fwd.values.tolist() [1.0, 1.0, 1.0, 4.0, 5.0] """ if not isinstance(ts, TimeSeries): raise TypeError( f"'ts' must be a TimeSeries, got {type(ts).__name__!r}." ) _VALID = {"linear", "forward", "backward", "nearest", "zero", "constant", "spline"} if method not in _VALID: raise ValueError( f"Unknown interpolation method {method!r}. " f"Valid options: {sorted(_VALID)}." ) if method == "constant": if fill_value is None: raise ValueError( "method='constant' requires a numeric 'fill_value'." ) series = ts.to_series().copy() if method == "linear": filled = series.interpolate(method="index", limit=limit) # pandas interpolate(method='index') leaves leading/trailing NaN; # fill them with nearest boundary value when no limit is imposed. if limit is None: filled = filled.ffill().bfill() elif method == "forward": filled = series.ffill(limit=limit) elif method == "backward": filled = series.bfill(limit=limit) elif method == "nearest": filled = series.interpolate(method="nearest", limit=limit) elif method == "zero": filled = series.fillna(0.0) elif method == "constant": filled = series.fillna(float(fill_value)) # type: ignore[arg-type] else: # spline try: from scipy.interpolate import CubicSpline except ImportError as exc: raise ImportError( "method='spline' requires scipy. " "Install it with: pip install scipy" ) from exc not_nan = ~series.isna() if not_nan.sum() < 2: raise ValueError( "method='spline' requires at least 2 non-NaN observations." ) x_all = np.arange(len(series), dtype=float) x_obs = x_all[not_nan.values] y_obs = series.values[not_nan.values] cs = CubicSpline(x_obs, y_obs, extrapolate=False) filled_vals = series.values.copy() nan_idx = np.where(series.isna().values)[0] # Only fill within the observed range in_range = (nan_idx >= x_obs[0]) & (nan_idx <= x_obs[-1]) filled_vals[nan_idx[in_range]] = cs(x_all[nan_idx[in_range]]) filled = pd.Series(filled_vals, index=series.index) # Preserve leading/trailing NaN if limit was applied return TimeSeries( filled.values, index=filled.index, name=ts.name, freq=ts.freq, unit=ts.unit, description=ts.description, )