Building a Real-Time Anomaly Detection Pipeline for IoT
We've been building an end-to-end anomaly detection system at Myelin for industrial equipment monitoring. Vibration sensors, temperature probes, current sensors on motors and compressors. The goal is simple: detect when something is about to fail before it actually fails. The execution, as always, is where it gets interesting.
I want to walk through the full architecture because I think the model gets too much attention in these discussions. The model is maybe 20% of the system. The other 80% is data plumbing, and getting that right is what makes or breaks the whole thing.
The Architecture
Here's the high-level flow:
[Sensors] → [Edge Gateway (RPi)] → [MQTT Broker] → [Stream Processor]
↓ ↓
[Local Inference] [Cloud Inference + Storage]
↓ ↓
[Local Alerts] [Dashboard + Alerts]
We run inference at two levels. A lightweight model on the Raspberry Pi gateway for immediate, low-latency alerts. And a heavier model in the cloud for deeper analysis and historical pattern detection. The edge model catches obvious anomalies in real-time. The cloud model catches subtle drift over days or weeks.
Sensor Data Ingestion
Each piece of equipment has 3-5 sensors (vibration, temp, current, RPM). They sample at different rates. Vibration at 1kHz, temperature at 1Hz. The edge gateway collects all of this and publishes to an MQTT broker.
Why MQTT? Because it's lightweight, supports QoS levels, and every IoT platform speaks it. Our Raspberry Pi gateway runs Mosquitto as a local broker and also publishes upstream to a cloud broker.
import paho.mqtt.client as mqtt
import json
client = mqtt.Client()
client.connect("localhost", 1883, 60)
def publish_reading(sensor_id, values, timestamp):
payload = json.dumps({
"sensor_id": sensor_id,
"values": values,
"ts": timestamp
})
client.publish(f"sensors/{sensor_id}/data", payload, qos=1)QoS 1 (at least once delivery) is important here. We can tolerate duplicate messages, but we can't tolerate missing ones. A missed anomaly reading could mean a missed equipment failure.
Preprocessing and Feature Extraction
Raw sensor data is noisy and high-dimensional. Before feeding it to the model, we extract features using sliding windows.
For vibration data, the useful features are in the frequency domain:
import numpy as np
from scipy import signal
def extract_features(window, sample_rate=1000):
# Time domain
rms = np.sqrt(np.mean(window ** 2))
peak = np.max(np.abs(window))
crest_factor = peak / rms
# Frequency domain
freqs, psd = signal.welch(window, fs=sample_rate, nperseg=256)
dominant_freq = freqs[np.argmax(psd)]
spectral_energy = np.sum(psd)
return np.array([
rms, peak, crest_factor,
dominant_freq, spectral_energy,
np.std(window), np.mean(window)
])We use a 5-second sliding window with 1-second stride for vibration data. That gives us 5 feature vectors per second, which is fast enough to catch sudden failures but slow enough that the Pi can keep up.
The Inference Layer
On the edge, we run a quantized autoencoder on the Raspberry Pi. Reconstruction error above a threshold triggers a local alert.
The interesting part is the adaptive threshold. Equipment behavior changes with load, ambient temperature, and time of day. A fixed threshold gives you too many false positives.
class AdaptiveThreshold:
def __init__(self, window_size=1000, sigma=3.0):
self.errors = []
self.window_size = window_size
self.sigma = sigma
def update(self, error):
self.errors.append(error)
if len(self.errors) > self.window_size:
self.errors.pop(0)
def is_anomaly(self, error):
if len(self.errors) < 100:
return False # not enough data yet
mean = np.mean(self.errors)
std = np.std(self.errors)
return error > mean + self.sigma * stdThe threshold adapts based on recent history. During high-load periods when vibration is naturally higher, the threshold adjusts up. This dropped our false positive rate from about 15% to under 3%.
Alerting
Alerts flow through two channels. Critical alerts (motor stall, sudden spike) go directly via SMS through the edge gateway. It has a GSM module for exactly this purpose because you can't depend on WiFi being up in a factory.
Non-critical alerts (gradual drift, maintenance recommendations) go through the cloud pipeline to a Grafana dashboard. The ops team checks this during shifts.
Lessons From Production
Network reliability is your biggest enemy. Factories have terrible connectivity. Our edge-first architecture means the system keeps working even when the cloud connection drops. The Pi buffers MQTT messages locally and syncs when connection restores.
Sensor calibration drifts. A vibration sensor that's been running for 6 months gives different baseline readings than a new one. We retrain the edge model monthly with recent data and push updates via OTA.
False positives kill trust. The factory team ignored our alerts for two weeks because of early false positives. We had to earn their trust back by dramatically reducing the false positive rate. The adaptive threshold was born from that painful experience.
The whole system processes sensor data from ingestion to alert in under 2 seconds on the edge path. Honestly, building this taught me more about production ML than any Kaggle competition ever could. The model is the easy part. The system around it is where the real engineering happens. The quantization workflow that made the edge model small enough to run on the Pi was just one piece -- the MQTT plumbing, adaptive thresholds, and dual-path architecture are what made it a real product.
Related Posts
From Hackathon to Production: What Changes When Prototypes Get Real
After years of hackathons and production systems, I've learned the gap between a winning demo and a reliable product is mostly about what you choose to worry about.
The FastAPI + vLLM + Docker Stack for Serving LLMs
The production stack for self-hosted LLM serving is maturing fast. Here's the architecture I've landed on after putting models into production at BulkMagic.
Voice AI Architecture: Building Conversational Agents at Scale
The full architecture behind voice AI systems. Pipeline design, latency budgets, and why voice is a fundamentally different engineering challenge than chat.