elwood_spatial.detect
Rule-based outlier detection. A device is flagged when all three conditions hold (Equation 4 from the paper):
- Information content ≥ θ
- Network entropy < S (entropy ceiling)
- Bin deviation ≥ nbins / β
DetectionParams
@dataclass(frozen=True)
class DetectionParams:
theta: float # Information content threshold
entropy_limit: float # Network entropy ceiling
beta: float # Bin deviation divisor | Field | Type | Description |
|---|---|---|
theta | float | Minimum information content to trigger detection |
entropy_limit | float | Network entropy must be below this value |
beta | float | Divisor for the bin deviation threshold (n_bins / beta) |
PARAMS_OPERATIONAL
Production defaults:
PARAMS_OPERATIONAL = DetectionParams(theta=1.75, entropy_limit=1.75, beta=3.5) is_outlier(information, entropy, bin_dev, num_bins, params)
Apply the three-condition rule to a single device.
| Parameter | Type | Description |
|---|---|---|
information | float | Device's information content |
entropy | float | Network entropy |
bin_dev | float | Device's bin deviation |
num_bins | int | Number of bins in the spec |
params | DetectionParams | Detection thresholds |
Returns bool.
detect_outliers(values, bins, network, params, target_id=None)
Detect outliers for a single timestep across all devices in a network.
| Parameter | Type | Description |
|---|---|---|
values | dict[str, float] | Device ID → measurement value |
bins | BinSpec | Bin specification |
network | Network | Spatial network dict |
params | DetectionParams | Detection thresholds |
target_id | str | None | If set, only evaluate this device |
Returns dict[str, bool], mapping device ID → outlier flag.
from elwood_spatial.detect import detect_outliers, PARAMS_OPERATIONAL
results = detect_outliers(values, bins, network, PARAMS_OPERATIONAL)
# => {"sensor_1": False, "sensor_2": True, ...} detect_outliers_batch(df, bins, network, params, ...)
Apply detection at every timestep in a DataFrame.
| Parameter | Type | Default | Description |
|---|---|---|---|
df | pd.DataFrame | Input data | |
bins | BinSpec | Bin specification | |
network | Network | Spatial network | |
params | DetectionParams | Detection thresholds | |
id_column | str | "id" | Device ID column |
time_column | str | "timestamp" | Timestamp column |
value_column | str | "value" | Measurement column |
Returns a copy of df with added columns: bin_index, is_outlier, information, entropy, bin_deviation.
Custom Parameters
Create custom parameters to tune sensitivity:
from elwood_spatial.detect import DetectionParams
# More sensitive, catches subtler anomalies
sensitive = DetectionParams(theta=1.0, entropy_limit=2.0, beta=2.5)
# More conservative, fewer false positives
conservative = DetectionParams(theta=2.5, entropy_limit=1.5, beta=4.5) Worked Example
End-to-end single-timestep detection with metric inspection:
import elwood_spatial as es
from elwood_spatial.detect import detect_outliers, is_outlier, PARAMS_OPERATIONAL
bins = es.BinSpec.from_tuples([(0, 50), (51, 100), (101, 150), (151, 200)])
values = {"s1": 45, "s2": 48, "s3": 120, "s4": 42, "s5": 50}
network = {s: {"neighbors": [x for x in values if x != s], "weights": [1.0]*4}
for s in values}
# Full detection
results = detect_outliers(values, bins, network, PARAMS_OPERATIONAL)
for sid, flagged in results.items():
print(f"{sid}: {'OUTLIER' if flagged else 'ok'}")
# => s3: OUTLIER (all others: ok)
# Inspect why s3 was flagged
bin_indices = {k: bins.bin_index(v) for k, v in values.items()}
all_idx = list(bin_indices.values())
info = es.information_content(bin_indices["s3"], all_idx)
entropy = es.shannon_entropy(all_idx)
others = [v for k, v in bin_indices.items() if k != "s3"]
bd = es.bin_deviation(bin_indices["s3"], others)
print(f"s3: info={info:.3f} >= 1.75? {info >= 1.75}")
print(f" entropy={entropy:.3f} < 1.75? {entropy < 1.75}")
print(f" bin_dev={bd:.3f} >= {bins.num_bins}/3.5={bins.num_bins/3.5:.2f}? {bd >= bins.num_bins/3.5}") Batch Example
import pandas as pd
import numpy as np
# 6 sensors, 24 hours, one faulty
rng = np.random.default_rng(42)
rows = []
for hour in range(24):
ts = pd.Timestamp("2024-07-15") + pd.Timedelta(hours=hour)
for s in ["s1", "s2", "s3", "s4", "s5"]:
rows.append({"id": s, "timestamp": ts, "value": 45 + rng.normal(0, 5)})
rows.append({"id": "s6", "timestamp": ts, "value": 160}) # stuck high
df = pd.DataFrame(rows)
result = es.detect_outliers_batch(df, bins, network_6, PARAMS_OPERATIONAL)
# How many hours was s6 flagged?
s6 = result[result["id"] == "s6"]
print(f"s6 flagged {s6['is_outlier'].sum()} / {len(s6)} hours")
print(s6[["timestamp", "value", "information", "entropy", "bin_deviation", "is_outlier"]].head())