The CRATEDB route stores MQTT messages in CrateDB, a distributed SQL database optimized for time-series and machine data. It combines the familiarity of SQL with the scalability needed for IoT workloads.
CrateDB excels at high-volume time-series data with fast ingestion and SQL query capabilities. Use it when you need real-time analytics on sensor data at scale.
DEFINE ROUTE SensorDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT StoreSensorReading WITH SOURCE_TOPIC "sensors/+/reading" WITH QUERY "INSERT INTO sensor_data (ts, sensor_id, value) VALUES (?, ?, ?)"
DEFINE ROUTE TimeSeriesDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT StoreTimeSeries WITH SOURCE_TOPIC "sensors/+/reading" WITH DESTINATION_TOPIC "db/crate/status" WITH QUERY "INSERT INTO sensor_data (ts, sensor_id, value) VALUES (?, ?, ?)"
Optimized for high ingestion rates:
DEFINE ROUTE MetricsDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "metrics" WITH USERNAME "metrics_writer" WITH PASSWORD "secure_password" ADD EVENT StoreMetrics WITH SOURCE_TOPIC "metrics/#" WITH QUERY "INSERT INTO metrics (ts, topic, data) VALUES (?, ?, ?)"
Route to different tables by topic:
DEFINE ROUTE MultiTableDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT StoreSensors WITH SOURCE_TOPIC "sensors/#" WITH QUERY "INSERT INTO sensors (ts, topic, data) VALUES (?, ?, ?)" ADD EVENT StoreAlerts WITH SOURCE_TOPIC "alerts/#" WITH QUERY "INSERT INTO alerts (ts, topic, data) VALUES (?, ?, ?)"
Insert or ignore on conflict. CrateDB supports ON CONFLICT DO NOTHING to skip duplicate inserts for idempotent ingestion:
DEFINE ROUTE EventLog WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT StoreUniqueEvent WITH SOURCE_TOPIC "events/+/occurred" WITH QUERY "INSERT INTO events (event_id, device_id, event_type, ts) VALUES ('{value.json.event_id}', '{value.json.device_id}', '{value.json.event_type}', '{value.json.timestamp}') ON CONFLICT (event_id) DO NOTHING"
ON CONFLICT (event_id) DO NOTHING ensures duplicate messages — common in IoT delivery — never create duplicate rows.
Modify existing records when a status change arrives:
DEFINE ROUTE DeviceRegistry WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT UpdateDeviceStatus WITH SOURCE_TOPIC "devices/+/status" WITH QUERY "UPDATE devices SET current_status = '{value.json.status}', last_seen = NOW() WHERE device_id = '{value.json.device_id}'"
Remove records on a decommission or cleanup event:
DEFINE ROUTE DeviceRegistry WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT DecommissionDevice WITH SOURCE_TOPIC "devices/+/decommissioned" WITH QUERY "DELETE FROM devices WHERE device_id = '{value.json.device_id}'"
Alternative: STORE IN with Models — Instead of writing EVENT queries, you can bind a model directly to this route. Every PUBLISH MODEL call automatically inserts a row — no query needed:
DEFINE MODEL SensorReading ADD STRING "sensor_id" ADD DOUBLE "value" STORE IN "TimeSeriesDB" WITH TABLE "sensor_data"
EVENTs also support SELECT queries to read data back from the database. Publish a message to the event’s SOURCE_TOPIC, and the query result is published to DESTINATION_TOPIC:
DEFINE ROUTE SensorDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT GetLatestReading WITH SOURCE_TOPIC "db/query/latest" WITH DESTINATION_TOPIC "db/result/latest" WITH QUERY "SELECT sensor_id, value FROM sensor_data WHERE sensor_id = '{payload}' ORDER BY ts DESC LIMIT 1"
To trigger this query, publish the sensor ID to the source topic:
Topic: db/query/latestPayload: temp001
The query result is published to db/result/latest, where your actions or external clients can consume it.
EVENTs can also run SELECT queries on a fixed schedule using WITH EVERY. This is especially powerful with CrateDB’s fast aggregations:
DEFINE ROUTE TimeSeriesDB WITH TYPE CRATEDB ADD CRATEDB_CONFIG WITH HOST "cratedb.example.com" WITH PORT '5432' WITH DATABASE "iot" WITH USERNAME "crate" WITH PASSWORD "secure_password" ADD EVENT PollLatestReadings WITH EVERY 5 SECONDS WITH DESTINATION_TOPIC "dashboard/sensors/latest" WITH QUERY "SELECT device_id, AVG(value) as avg_value, MAX(ts) as last_seen FROM sensor_data WHERE ts > NOW() - INTERVAL '1 minute' GROUP BY device_id ORDER BY device_id"
-- Partitioned time-series tableCREATE TABLE sensor_data ( ts TIMESTAMP WITH TIME ZONE, sensor_id TEXT, value DOUBLE PRECISION) PARTITIONED BY (ts);-- Metrics with JSON storageCREATE TABLE metrics ( ts TIMESTAMP WITH TIME ZONE, topic TEXT, data OBJECT(DYNAMIC)) PARTITIONED BY (ts);