← API Reference

elwood_spatial.detect

Rule-based outlier detection. A device is flagged when all three conditions hold (Equation 4 from the paper):

Outlier detection equation
  1. Information content ≥ θ
  2. Network entropy < S (entropy ceiling)
  3. Bin deviation ≥ nbins / β

DetectionParams

@dataclass(frozen=True)
class DetectionParams:
    theta: float           # Information content threshold
    entropy_limit: float   # Network entropy ceiling
    beta: float            # Bin deviation divisor
FieldTypeDescription
thetafloatMinimum information content to trigger detection
entropy_limitfloatNetwork entropy must be below this value
betafloatDivisor for the bin deviation threshold (n_bins / beta)

PARAMS_OPERATIONAL

Production defaults:

PARAMS_OPERATIONAL = DetectionParams(theta=1.75, entropy_limit=1.75, beta=3.5)

is_outlier(information, entropy, bin_dev, num_bins, params)

Apply the three-condition rule to a single device.

ParameterTypeDescription
informationfloatDevice's information content
entropyfloatNetwork entropy
bin_devfloatDevice's bin deviation
num_binsintNumber of bins in the spec
paramsDetectionParamsDetection thresholds

Returns bool.

detect_outliers(values, bins, network, params, target_id=None)

Detect outliers for a single timestep across all devices in a network.

ParameterTypeDescription
valuesdict[str, float]Device ID → measurement value
binsBinSpecBin specification
networkNetworkSpatial network dict
paramsDetectionParamsDetection thresholds
target_idstr | NoneIf set, only evaluate this device

Returns dict[str, bool], mapping device ID → outlier flag.

from elwood_spatial.detect import detect_outliers, PARAMS_OPERATIONAL

results = detect_outliers(values, bins, network, PARAMS_OPERATIONAL)
# => {"sensor_1": False, "sensor_2": True, ...}

detect_outliers_batch(df, bins, network, params, ...)

Apply detection at every timestep in a DataFrame.

ParameterTypeDefaultDescription
dfpd.DataFrameInput data
binsBinSpecBin specification
networkNetworkSpatial network
paramsDetectionParamsDetection thresholds
id_columnstr"id"Device ID column
time_columnstr"timestamp"Timestamp column
value_columnstr"value"Measurement column

Returns a copy of df with added columns: bin_index, is_outlier, information, entropy, bin_deviation.

Custom Parameters

Create custom parameters to tune sensitivity:

from elwood_spatial.detect import DetectionParams

# More sensitive, catches subtler anomalies
sensitive = DetectionParams(theta=1.0, entropy_limit=2.0, beta=2.5)

# More conservative, fewer false positives
conservative = DetectionParams(theta=2.5, entropy_limit=1.5, beta=4.5)

Worked Example

End-to-end single-timestep detection with metric inspection:

import elwood_spatial as es
from elwood_spatial.detect import detect_outliers, is_outlier, PARAMS_OPERATIONAL

bins = es.BinSpec.from_tuples([(0, 50), (51, 100), (101, 150), (151, 200)])
values = {"s1": 45, "s2": 48, "s3": 120, "s4": 42, "s5": 50}
network = {s: {"neighbors": [x for x in values if x != s], "weights": [1.0]*4}
           for s in values}

# Full detection
results = detect_outliers(values, bins, network, PARAMS_OPERATIONAL)
for sid, flagged in results.items():
    print(f"{sid}: {'OUTLIER' if flagged else 'ok'}")
# => s3: OUTLIER (all others: ok)

# Inspect why s3 was flagged
bin_indices = {k: bins.bin_index(v) for k, v in values.items()}
all_idx = list(bin_indices.values())
info = es.information_content(bin_indices["s3"], all_idx)
entropy = es.shannon_entropy(all_idx)
others = [v for k, v in bin_indices.items() if k != "s3"]
bd = es.bin_deviation(bin_indices["s3"], others)

print(f"s3: info={info:.3f} >= 1.75? {info >= 1.75}")
print(f"    entropy={entropy:.3f} < 1.75? {entropy < 1.75}")
print(f"    bin_dev={bd:.3f} >= {bins.num_bins}/3.5={bins.num_bins/3.5:.2f}? {bd >= bins.num_bins/3.5}")

Batch Example

import pandas as pd
import numpy as np

# 6 sensors, 24 hours, one faulty
rng = np.random.default_rng(42)
rows = []
for hour in range(24):
    ts = pd.Timestamp("2024-07-15") + pd.Timedelta(hours=hour)
    for s in ["s1", "s2", "s3", "s4", "s5"]:
        rows.append({"id": s, "timestamp": ts, "value": 45 + rng.normal(0, 5)})
    rows.append({"id": "s6", "timestamp": ts, "value": 160})  # stuck high

df = pd.DataFrame(rows)
result = es.detect_outliers_batch(df, bins, network_6, PARAMS_OPERATIONAL)

# How many hours was s6 flagged?
s6 = result[result["id"] == "s6"]
print(f"s6 flagged {s6['is_outlier'].sum()} / {len(s6)} hours")
print(s6[["timestamp", "value", "information", "entropy", "bin_deviation", "is_outlier"]].head())