The POSTGRESQL route stores MQTT messages in PostgreSQL databases. It supports SSL connections, parameterized queries, and automatic reconnection for reliable data persistence.
PostgreSQL is ideal for structured data that requires complex queries, joins, and ACID compliance. Use it when you need relational integrity and powerful SQL capabilities.
DEFINE ROUTE SensorDB WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT StoreSensorReading WITH SOURCE_TOPIC "sensors/+/reading" WITH QUERY "INSERT INTO readings (ts, sensor_id, value) VALUES (NOW(), '{sensor_id}', '{value.json}')"
DEFINE ROUTE TemperatureDB WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT StoreTemperature WITH SOURCE_TOPIC "sensors/+/temperature" WITH DESTINATION_TOPIC "db/status/temperature" WITH QUERY "INSERT INTO temperature_readings (recorded_at, sensor_id, value, unit) VALUES (NOW(), '{sensor_id}', {value.json.value}, '{value.json.unit}')"
Input atsensors/temp001/temperature:
{"value": 23.5, "unit": "celsius"}
Secure connection with SSL:
DEFINE ROUTE SecureDB WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.cloud.com" WITH PORT '5432' WITH DATABASE "production" WITH USERNAME "app_user" WITH PASSWORD "secure_password" WITH USE_SSL "true" WITH TRUST_SERVER_CERTIFICATE "false" ADD EVENT StoreEvent WITH SOURCE_TOPIC "events/#" WITH QUERY "INSERT INTO events (ts, topic, payload) VALUES (NOW(), '{source_topic}', '{value.json}')"
Different tables for different data types:
DEFINE ROUTE MultiTableDB WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT StoreSensorData WITH SOURCE_TOPIC "sensors/+/data" WITH QUERY "INSERT INTO sensor_data (ts, sensor_id, value) VALUES (NOW(), '{sensor_id}', '{value.json}')" ADD EVENT StoreAlerts WITH SOURCE_TOPIC "alerts/+" WITH QUERY "INSERT INTO alerts (ts, alert_type, message) VALUES (NOW(), '{alert_type}', '{value.json}')" ADD EVENT StoreMetrics WITH SOURCE_TOPIC "metrics/#" WITH QUERY "INSERT INTO metrics (ts, topic, data) VALUES (NOW(), '{source_topic}', '{value.json}')"
Store full payload in JSONB column:
DEFINE ROUTE JSONStorage WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT StoreJSON WITH SOURCE_TOPIC "devices/#" WITH QUERY "INSERT INTO device_data (ts, topic, payload) VALUES (NOW(), '{source_topic}', '{value.json}'::jsonb)"
This allows powerful JSON queries in PostgreSQL:
SELECT * FROM device_data WHERE payload->>'temperature' > '25';
Insert a record, or update it if a conflict on the unique key occurs. Useful for device registration or status tables where duplicate messages are possible:
DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT RegisterDevice WITH SOURCE_TOPIC "devices/+/registered" WITH QUERY "INSERT INTO devices (device_id, name, area, active, updated_at) VALUES ('{value.json.device_id}', '{value.json.name}', '{value.json.area}', TRUE, NOW()) ON CONFLICT (device_id) DO UPDATE SET name = EXCLUDED.name, area = EXCLUDED.area, updated_at = NOW()"
ON CONFLICT (device_id) DO UPDATE SET updates only the changed columns when the device already exists — the row is never duplicated.
Modify existing records when a status change or update arrives:
DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT UpdateDeviceStatus WITH SOURCE_TOPIC "devices/+/status" WITH QUERY "UPDATE devices SET current_status = '{value.json.status}', last_seen = NOW() WHERE device_id = '{value.json.device_id}'"
Only rows matching the WHERE clause are modified. Unmatched messages produce no change — no error is raised.
Remove records when a decommission or cleanup event arrives:
DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT DecommissionDevice WITH SOURCE_TOPIC "devices/+/decommissioned" WITH QUERY "DELETE FROM devices WHERE device_id = '{value.json.device_id}' AND active = FALSE"
The AND active = FALSE guard prevents accidental deletes if a message arrives before the device is properly deactivated.
Alternative: STORE IN with Models — Instead of writing EVENT queries, you can bind a model directly to this route. Every PUBLISH MODEL call automatically inserts a row — no query needed:
DEFINE MODEL SensorReading ADD STRING "sensor_id" ADD DOUBLE "value" STORE IN "SensorDB" WITH TABLE "sensor_readings"
EVENTs also support SELECT queries to read data back from the database. Publish a message to the event’s SOURCE_TOPIC, and the query result is published to DESTINATION_TOPIC:
DEFINE ROUTE SensorDB WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT GetLatestReading WITH SOURCE_TOPIC "db/query/latest" WITH DESTINATION_TOPIC "db/result/latest" WITH QUERY "SELECT sensor_id, value FROM readings WHERE sensor_id = '{payload}' ORDER BY ts DESC LIMIT 1"
To trigger this query, publish the sensor ID to the source topic:
Topic: db/query/latestPayload: temp001
The query result is published to db/result/latest, where your actions or external clients can consume it.
EVENTs can also run SELECT queries on a schedule instead of waiting for an MQTT trigger. Use WITH EVERY to publish fresh data at a fixed interval:
DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER "postgres.example.com" WITH PORT '5432' WITH DATABASE "iot_data" WITH USERNAME "iot_user" WITH PASSWORD "secure_password" ADD EVENT PollActiveDevices WITH EVERY 10 SECONDS WITH DESTINATION_TOPIC "dashboard/devices/active" WITH QUERY "SELECT device_id, name, current_status, last_seen FROM devices WHERE active = TRUE ORDER BY name"
The query result is published to dashboard/devices/active every 10 seconds — no trigger message needed.