Source code for tseda.features.spectral
"""
Spectral feature extraction for time series.
Computes frequency-domain descriptors of a :class:`~tseda.core.TimeSeries`
using the FFT power spectrum. All computations use pure numpy / scipy.
Feature groups
--------------
* **Energy / Power** — total spectral power, power in low / mid / high bands.
* **Shape** — spectral centroid, bandwidth, rolloff frequency, spectral entropy.
* **Peak** — dominant frequency, dominant period, number of spectral peaks.
* **Temporal** — spectral flatness (ratio of geometric to arithmetic mean power).
Classes
-------
SpectralFeatureExtractor
Stateless extractor.
Examples
--------
>>> import pandas as pd, numpy as np
>>> from tseda import TimeSeries
>>> from tseda.features.spectral import SpectralFeatureExtractor
>>> rng = np.random.default_rng(0)
>>> idx = pd.date_range("2020", periods=256, freq="D")
>>> ts = TimeSeries(np.sin(2 * np.pi * np.arange(256) / 7), index=idx)
>>> df = SpectralFeatureExtractor().extract(ts)
>>> int(round(float(df["dominant_period"].iloc[0])))
7
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from scipy import signal as sp_signal
from tseda.core.timeseries import TimeSeries
__all__ = ["SpectralFeatureExtractor"]
[docs]
class SpectralFeatureExtractor:
"""Extract frequency-domain features from a :class:`~tseda.core.TimeSeries`.
The extractor is **stateless**. NaN values in the series are replaced
by linear interpolation before FFT analysis.
Methods
-------
extract(ts, n_bands)
Return a single-row :class:`pandas.DataFrame` of spectral features.
Examples
--------
>>> import pandas as pd, numpy as np
>>> from tseda import TimeSeries
>>> from tseda.features.spectral import SpectralFeatureExtractor
>>> idx = pd.date_range("2020", periods=128, freq="h")
>>> ts = TimeSeries(np.sin(2 * np.pi * np.arange(128) / 24), index=idx)
>>> df = SpectralFeatureExtractor().extract(ts)
>>> "spectral_centroid" in df.columns
True
"""
[docs]
def extract(
self,
ts: TimeSeries,
*,
n_bands: int = 3,
) -> pd.DataFrame:
"""Compute spectral features for *ts*.
Parameters
----------
ts : TimeSeries
Input series. NaN values are linearly interpolated before
the FFT.
n_bands : int, optional
Number of equal-width frequency bands for band power features.
Default 3 (low / mid / high). Must be >= 1.
Returns
-------
pandas.DataFrame
One row, columns:
Energy:
``total_power``,
``band_power_0`` … ``band_power_{n_bands-1}``.
Shape:
``spectral_centroid``, ``spectral_bandwidth``,
``spectral_rolloff_0.5``, ``spectral_rolloff_0.85``,
``spectral_entropy``, ``spectral_flatness``.
Peak:
``dominant_freq``, ``dominant_period``, ``n_spectral_peaks``.
Raises
------
TypeError
If *ts* is not a :class:`~tseda.core.TimeSeries`.
ValueError
If the series has fewer than 8 non-NaN observations or
*n_bands* < 1.
Examples
--------
>>> import pandas as pd, numpy as np
>>> from tseda import TimeSeries
>>> from tseda.features.spectral import SpectralFeatureExtractor
>>> idx = pd.date_range("2020", periods=256, freq="D")
>>> ts = TimeSeries(np.cos(2*np.pi*np.arange(256)/7), index=idx)
>>> df = SpectralFeatureExtractor().extract(ts)
>>> int(round(float(df["dominant_period"].iloc[0])))
7
"""
if not isinstance(ts, TimeSeries):
raise TypeError(
f"'ts' must be a TimeSeries, got {type(ts).__name__!r}."
)
if n_bands < 1:
raise ValueError(f"'n_bands' must be >= 1, got {n_bands}.")
vals = ts.values.astype(float)
n = len(vals)
if np.sum(~np.isnan(vals)) < 8:
raise ValueError(
"SpectralFeatureExtractor requires at least 8 non-NaN "
f"observations; got {int(np.sum(~np.isnan(vals)))}."
)
# Fill NaN by linear interpolation
nan_mask = np.isnan(vals)
if nan_mask.any():
idx_arr = np.arange(n)
vals[nan_mask] = np.interp(
idx_arr[nan_mask], idx_arr[~nan_mask], vals[~nan_mask]
)
# Detrend + Hann window
x = sp_signal.detrend(vals, type="linear")
x = x * np.hanning(n)
# FFT power spectrum (one-sided, excluding DC)
fft_vals = np.fft.rfft(x)
power = np.abs(fft_vals) ** 2
# Frequency axis (normalised: 0 → 0.5 cycles/sample)
freqs = np.fft.rfftfreq(n) # shape (n//2 + 1,)
# Exclude DC (index 0)
power_ac = power[1:]
freqs_ac = freqs[1:]
n_ac = len(power_ac)
total_power = float(power_ac.sum()) if power_ac.sum() > 0 else 1e-12
# ── Band power ────────────────────────────────────────────────
band_edges = np.linspace(0, n_ac, n_bands + 1, dtype=int)
band_power = {
f"band_power_{i}": float(power_ac[band_edges[i]:band_edges[i + 1]].sum())
for i in range(n_bands)
}
# ── Shape ─────────────────────────────────────────────────────
# Spectral centroid (weighted mean frequency)
centroid = float(np.sum(freqs_ac * power_ac) / total_power)
# Spectral bandwidth (weighted std of frequency)
bw = float(
np.sqrt(np.sum(((freqs_ac - centroid) ** 2) * power_ac) / total_power)
)
# Spectral rolloff (frequency below which X% of power resides)
cumpower = np.cumsum(power_ac)
rolloff_50 = float(freqs_ac[np.searchsorted(cumpower, 0.50 * total_power)])
rolloff_85 = float(freqs_ac[np.searchsorted(cumpower, 0.85 * total_power)])
# Spectral entropy (normalised power distribution)
p_norm = power_ac / total_power
# Avoid log(0)
p_norm = np.where(p_norm > 0, p_norm, 1e-12)
sp_ent = float(-np.sum(p_norm * np.log(p_norm)) / np.log(n_ac))
# Spectral flatness (geometric / arithmetic mean ratio)
log_mean = float(np.exp(np.mean(np.log(power_ac + 1e-12))))
arith_mean = float(np.mean(power_ac))
sp_flat = log_mean / arith_mean if arith_mean > 0 else 0.0
# ── Peak ──────────────────────────────────────────────────────
dom_idx = int(np.argmax(power_ac))
dom_freq = float(freqs_ac[dom_idx])
dom_per = float(1.0 / dom_freq) if dom_freq > 0 else float("nan")
peaks, _ = sp_signal.find_peaks(power_ac, height=0)
n_peaks = int(len(peaks))
row: dict = {
"total_power": total_power,
**band_power,
"spectral_centroid": centroid,
"spectral_bandwidth": bw,
"spectral_rolloff_0.5": rolloff_50,
"spectral_rolloff_0.85":rolloff_85,
"spectral_entropy": sp_ent,
"spectral_flatness": sp_flat,
"dominant_freq": dom_freq,
"dominant_period": dom_per,
"n_spectral_peaks": float(n_peaks),
}
return pd.DataFrame([row], index=[ts.index[0]])