"""
Flat-line and near-constant segment detection for time series.
*Timestamp* duplicates are rejected at construction time by
:func:`~tseda.core.validator.validate_datetime_index`. This module
addresses the complementary problem: consecutive identical or near-zero
*values*, which typically signal:
* A stuck sensor / ADC saturation.
* A data-pipeline bug that forward-filled data without marking it.
* A genuine flat segment that may confuse differencing-based methods.
Classes
-------
FlatlineReport
Immutable result dataclass returned by :meth:`DuplicateDetector.flatline`.
DuplicateDetector
Stateless detector.
Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.duplicates import DuplicateDetector
>>> idx = pd.date_range("2020-01-01", periods=10, freq="D")
>>> vals = np.array([1.0, 2.0, 3.0, 3.0, 3.0, 3.0, 4.0, 5.0, 5.0, 6.0])
>>> ts = TimeSeries(vals, index=idx)
>>> det = DuplicateDetector()
>>> r = det.flatline(ts, min_run=3)
>>> r.n_flatline_runs
1
>>> r.longest_run
4
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Tuple
import numpy as np
import pandas as pd
from tseda.core.timeseries import TimeSeries
from tseda.core.validator import validate_positive_int
__all__ = ["FlatlineReport", "DuplicateDetector"]
# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------
[docs]
@dataclass(frozen=True)
class FlatlineReport:
"""Immutable summary of flat-line segments in a :class:`~tseda.core.TimeSeries`.
Attributes
----------
n_flatline_runs : int
Number of runs that meet or exceed *min_run* in length.
longest_run : int
Length of the single longest flat-line run.
total_flatline_points : int
Total number of observations that belong to a qualifying flat-line
run (includes the first observation of each run).
runs : list of (start_pos, end_pos, value)
Each element is a tuple ``(start_pos, end_pos, value)`` where
*start_pos* and *end_pos* are 0-based integer positions and
*value* is the repeated value. Only runs of length >= *min_run*
are included.
mask : numpy.ndarray
Boolean array; ``True`` at every position that is part of a
qualifying flat-line run.
min_run : int
The minimum run length used for this report.
"""
n_flatline_runs: int
longest_run: int
total_flatline_points: int
runs: List[Tuple[int, int, float]]
mask: np.ndarray
min_run: int
[docs]
def __repr__(self) -> str: # pragma: no cover
return (
f"FlatlineReport(\n"
f" n_flatline_runs : {self.n_flatline_runs}\n"
f" longest_run : {self.longest_run}\n"
f" total_flatline_points : {self.total_flatline_points}\n"
f" min_run : {self.min_run}\n"
f")"
)
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _find_runs(
values: np.ndarray, *, min_run: int, atol: float
) -> List[Tuple[int, int, float]]:
"""Return all consecutive runs of near-equal values.
Parameters
----------
values:
1-D float array (NaN values break any run they belong to).
min_run:
Minimum run length to report.
atol:
Absolute tolerance for equality.
Returns
-------
list of (start, end, value)
*end* is inclusive.
"""
n = len(values)
if n == 0:
return []
runs: List[Tuple[int, int, float]] = []
start = 0
ref = values[0]
for i in range(1, n):
v = values[i]
same = (not np.isnan(ref)) and (not np.isnan(v)) and abs(v - ref) <= atol
if same:
continue
run_len = i - start
if run_len >= min_run and not np.isnan(ref):
runs.append((start, i - 1, float(ref)))
start = i
ref = v
# Handle last run
run_len = n - start
if run_len >= min_run and not np.isnan(ref):
runs.append((start, n - 1, float(ref)))
return runs
# ---------------------------------------------------------------------------
# Detector
# ---------------------------------------------------------------------------
[docs]
class DuplicateDetector:
"""Detect consecutive duplicate (flat-line) value runs.
Methods
-------
flatline(ts, min_run=3, atol=0.0)
Detect flat-line segments of repeated values.
near_zero(ts, min_run=3, threshold=1e-8)
Detect segments where the series is stuck near zero.
remove_flatlines(ts, report)
Replace flat-line positions with NaN (keeping the first value).
Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.duplicates import DuplicateDetector
>>> idx = pd.date_range("2020", periods=8, freq="D")
>>> vals = np.array([1.0, 5.0, 5.0, 5.0, 5.0, 2.0, 3.0, 4.0])
>>> ts = TimeSeries(vals, index=idx)
>>> det = DuplicateDetector()
>>> r = det.flatline(ts, min_run=3)
>>> r.n_flatline_runs
1
>>> r.longest_run
4
"""
@staticmethod
def _validate(ts: object) -> TimeSeries:
if not isinstance(ts, TimeSeries):
raise TypeError(
f"'ts' must be a TimeSeries, got {type(ts).__name__!r}."
)
return ts # type: ignore[return-value]
[docs]
def flatline(
self,
ts: TimeSeries,
min_run: int = 3,
*,
atol: float = 0.0,
) -> FlatlineReport:
"""Detect consecutive runs of identical (or near-identical) values.
Parameters
----------
ts : TimeSeries
Input series.
min_run : int, optional
Minimum number of consecutive identical observations to constitute
a "flat line". Default ``3``.
atol : float, optional
Absolute tolerance for equality. Two values ``a`` and ``b`` are
considered equal when ``|a - b| <= atol``. Default ``0.0``
(exact equality).
Returns
-------
FlatlineReport
Raises
------
TypeError
If *ts* is not a :class:`~tseda.core.TimeSeries`.
ValueError
If *min_run* < 2.
Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.duplicates import DuplicateDetector
Exact flat line of length 4:
>>> idx = pd.date_range("2020", periods=7, freq="D")
>>> vals = np.array([1.0, 3.0, 3.0, 3.0, 3.0, 4.0, 5.0])
>>> ts = TimeSeries(vals, index=idx)
>>> r = DuplicateDetector().flatline(ts, min_run=3)
>>> r.n_flatline_runs
1
>>> r.runs[0]
(1, 4, 3.0)
No flat line (min_run too high):
>>> r2 = DuplicateDetector().flatline(ts, min_run=5)
>>> r2.n_flatline_runs
0
"""
ts = self._validate(ts)
min_run = validate_positive_int(min_run, name="min_run")
if min_run < 2:
raise ValueError(
f"'min_run' must be >= 2 to detect a repeated run, got {min_run}."
)
if atol < 0:
raise ValueError(f"'atol' must be >= 0, got {atol}.")
vals = ts.values
runs = _find_runs(vals, min_run=min_run, atol=atol)
mask = np.zeros(ts.n, dtype=bool)
for start, end, _ in runs:
mask[start : end + 1] = True
longest = max((e - s + 1 for s, e, _ in runs), default=0)
total = int(mask.sum())
return FlatlineReport(
n_flatline_runs=len(runs),
longest_run=longest,
total_flatline_points=total,
runs=runs,
mask=mask,
min_run=min_run,
)
[docs]
def near_zero(
self,
ts: TimeSeries,
min_run: int = 3,
*,
threshold: float = 1e-8,
) -> FlatlineReport:
"""Detect segments where the series is stuck near zero.
Only consecutive runs where **every** value satisfies
``|x| <= threshold`` are reported. This differs from
:meth:`flatline`, which detects any repeated value regardless of
magnitude.
Parameters
----------
ts : TimeSeries
Input series.
min_run : int, optional
Minimum run length. Default ``3``.
threshold : float, optional
Maximum absolute value to count as "near zero".
Default ``1e-8``.
Returns
-------
FlatlineReport
Runs where every value satisfies ``|x| <= threshold``.
Raises
------
ValueError
If *threshold* < 0.
Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.duplicates import DuplicateDetector
>>> idx = pd.date_range("2020", periods=8, freq="D")
>>> vals = np.array([1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 2.0, 3.0])
>>> ts = TimeSeries(vals, index=idx)
>>> r = DuplicateDetector().near_zero(ts, min_run=3)
>>> r.n_flatline_runs
1
"""
ts = self._validate(ts)
min_run = validate_positive_int(min_run, name="min_run")
if threshold < 0:
raise ValueError(f"'threshold' must be >= 0, got {threshold}.")
vals = ts.values
nz_mask = np.abs(vals) <= threshold # True where near-zero
# Find runs of consecutive True values in nz_mask
n = len(vals)
runs: List[Tuple[int, int, float]] = []
in_run = False
run_start = 0
for i in range(n):
if nz_mask[i]:
if not in_run:
run_start = i
in_run = True
else:
if in_run:
run_len = i - run_start
if run_len >= min_run:
runs.append((run_start, i - 1, float(vals[run_start])))
in_run = False
if in_run:
run_len = n - run_start
if run_len >= min_run:
runs.append((run_start, n - 1, float(vals[run_start])))
mask = np.zeros(n, dtype=bool)
for s, e, _ in runs:
mask[s : e + 1] = True
longest = max((e - s + 1 for s, e, _ in runs), default=0)
return FlatlineReport(
n_flatline_runs=len(runs),
longest_run=longest,
total_flatline_points=int(mask.sum()),
runs=runs,
mask=mask,
min_run=min_run,
)
[docs]
def remove_flatlines(
self,
ts: TimeSeries,
report: FlatlineReport,
*,
keep_first: bool = True,
) -> TimeSeries:
"""Replace flat-line positions with NaN.
Parameters
----------
ts : TimeSeries
The original series.
report : FlatlineReport
Result from :meth:`flatline` or :meth:`near_zero`.
keep_first : bool, optional
When ``True`` (default), the *first* observation of each
flat-line run is preserved; only the *repeated* copies are
set to NaN. When ``False``, the entire run including the
first observation is set to NaN.
Returns
-------
TimeSeries
A new series with flat-line values replaced by NaN.
Examples
--------
>>> import numpy as np, pandas as pd
>>> from tseda import TimeSeries
>>> from tseda.quality.duplicates import DuplicateDetector
>>> idx = pd.date_range("2020", periods=6, freq="D")
>>> vals = np.array([1.0, 5.0, 5.0, 5.0, 2.0, 3.0])
>>> ts = TimeSeries(vals, index=idx)
>>> det = DuplicateDetector()
>>> r = det.flatline(ts, min_run=3)
>>> cleaned = det.remove_flatlines(ts, r, keep_first=True)
>>> cleaned.n_nan
2
"""
if not isinstance(report, FlatlineReport):
raise TypeError(
f"'report' must be a FlatlineReport, got {type(report).__name__!r}."
)
ts = self._validate(ts)
vals = ts.values.copy()
for start, end, _ in report.runs:
replace_from = start + 1 if keep_first else start
vals[replace_from : end + 1] = np.nan
return TimeSeries(
vals, index=ts.index, name=ts.name,
freq=ts.freq, unit=ts.unit, description=ts.description,
)