·6 min read

Building a Real-Time Anomaly Detection Pipeline for IoT

iotanomaly-detectionarchitecture

We've been building an end-to-end anomaly detection system at Myelin for industrial equipment monitoring. Vibration sensors, temperature probes, current sensors on motors and compressors. The goal is simple: detect when something is about to fail before it actually fails. The execution, as always, is where it gets interesting.

I want to walk through the full architecture because I think the model gets too much attention in these discussions. The model is maybe 20% of the system. The other 80% is data plumbing, and getting that right is what makes or breaks the whole thing.

The Architecture

Here's the high-level flow:

[Sensors] → [Edge Gateway (RPi)] → [MQTT Broker] → [Stream Processor]
                  ↓                                        ↓
          [Local Inference]                    [Cloud Inference + Storage]
                  ↓                                        ↓
          [Local Alerts]                          [Dashboard + Alerts]
Dual-path anomaly detection: edge inference handles immediate alerts while the cloud pipeline catches long-term drift.

We run inference at two levels. A lightweight model on the Raspberry Pi gateway for immediate, low-latency alerts. And a heavier model in the cloud for deeper analysis and historical pattern detection. The edge model catches obvious anomalies in real-time. The cloud model catches subtle drift over days or weeks.

Sensor Data Ingestion

Each piece of equipment has 3-5 sensors (vibration, temp, current, RPM). They sample at different rates. Vibration at 1kHz, temperature at 1Hz. The edge gateway collects all of this and publishes to an MQTT broker.

Why MQTT? Because it's lightweight, supports QoS levels, and every IoT platform speaks it. Our Raspberry Pi gateway runs Mosquitto as a local broker and also publishes upstream to a cloud broker.

import paho.mqtt.client as mqtt
import json
 
client = mqtt.Client()
client.connect("localhost", 1883, 60)
 
def publish_reading(sensor_id, values, timestamp):
    payload = json.dumps({
        "sensor_id": sensor_id,
        "values": values,
        "ts": timestamp
    })
    client.publish(f"sensors/{sensor_id}/data", payload, qos=1)

QoS 1 (at least once delivery) is important here. We can tolerate duplicate messages, but we can't tolerate missing ones. A missed anomaly reading could mean a missed equipment failure.

Preprocessing and Feature Extraction

Raw sensor data is noisy and high-dimensional. Before feeding it to the model, we extract features using sliding windows.

For vibration data, the useful features are in the frequency domain:

import numpy as np
from scipy import signal
 
def extract_features(window, sample_rate=1000):
    # Time domain
    rms = np.sqrt(np.mean(window ** 2))
    peak = np.max(np.abs(window))
    crest_factor = peak / rms
 
    # Frequency domain
    freqs, psd = signal.welch(window, fs=sample_rate, nperseg=256)
    dominant_freq = freqs[np.argmax(psd)]
    spectral_energy = np.sum(psd)
 
    return np.array([
        rms, peak, crest_factor,
        dominant_freq, spectral_energy,
        np.std(window), np.mean(window)
    ])

We use a 5-second sliding window with 1-second stride for vibration data. That gives us 5 feature vectors per second, which is fast enough to catch sudden failures but slow enough that the Pi can keep up.

The Inference Layer

On the edge, we run a quantized autoencoder on the Raspberry Pi. Reconstruction error above a threshold triggers a local alert.

The interesting part is the adaptive threshold. Equipment behavior changes with load, ambient temperature, and time of day. A fixed threshold gives you too many false positives.

class AdaptiveThreshold:
    def __init__(self, window_size=1000, sigma=3.0):
        self.errors = []
        self.window_size = window_size
        self.sigma = sigma
 
    def update(self, error):
        self.errors.append(error)
        if len(self.errors) > self.window_size:
            self.errors.pop(0)
 
    def is_anomaly(self, error):
        if len(self.errors) < 100:
            return False  # not enough data yet
        mean = np.mean(self.errors)
        std = np.std(self.errors)
        return error > mean + self.sigma * std

The threshold adapts based on recent history. During high-load periods when vibration is naturally higher, the threshold adjusts up. This dropped our false positive rate from about 15% to under 3%.

Alerting

Alerts flow through two channels. Critical alerts (motor stall, sudden spike) go directly via SMS through the edge gateway. It has a GSM module for exactly this purpose because you can't depend on WiFi being up in a factory.

Non-critical alerts (gradual drift, maintenance recommendations) go through the cloud pipeline to a Grafana dashboard. The ops team checks this during shifts.

Lessons From Production

Network reliability is your biggest enemy. Factories have terrible connectivity. Our edge-first architecture means the system keeps working even when the cloud connection drops. The Pi buffers MQTT messages locally and syncs when connection restores.

Sensor calibration drifts. A vibration sensor that's been running for 6 months gives different baseline readings than a new one. We retrain the edge model monthly with recent data and push updates via OTA.

False positives kill trust. The factory team ignored our alerts for two weeks because of early false positives. We had to earn their trust back by dramatically reducing the false positive rate. The adaptive threshold was born from that painful experience.

The whole system processes sensor data from ingestion to alert in under 2 seconds on the edge path. Honestly, building this taught me more about production ML than any Kaggle competition ever could. The model is the easy part. The system around it is where the real engineering happens. The quantization workflow that made the edge model small enough to run on the Pi was just one piece -- the MQTT plumbing, adaptive thresholds, and dual-path architecture are what made it a real product.