This example demonstrates synchronizing sensor data from a local edge device to a cloud broker. The out direction ensures data flows from your local broker to the cloud, while TLS encryption secures the connection:
DEFINE ROUTE EdgeToCloudSync WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER SELF ADD DESTINATION_CONFIG WITH BROKER_ADDRESS "iot.cloudprovider.com" WITH BROKER_PORT '8883' WITH CLIENT_ID "EdgeDevice-Factory1" WITH USERNAME "edge_device_001" WITH PASSWORD "secure_cloud_password" WITH USE_TLS true ADD MAPPING allSensors WITH SOURCE_TOPIC "local/sensors/#" WITH DESTINATION_TOPIC "factory1/edge/sensors/#" WITH DIRECTION "out" ADD MAPPING equipmentStatus WITH SOURCE_TOPIC "local/equipment/+/status" WITH DESTINATION_TOPIC "factory1/edge/equipment/+/status" WITH DIRECTION "out"
Receive commands from cloud systems and distribute them to local devices. The in direction means data flows from the remote broker to your local broker:
DEFINE ROUTE CloudCommands WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER SELF ADD DESTINATION_CONFIG WITH BROKER_ADDRESS "iot.cloudprovider.com" WITH BROKER_PORT '8883' WITH CLIENT_ID "EdgeDevice-Factory1-Commands" WITH USE_TLS true ADD MAPPING deviceCommands WITH SOURCE_TOPIC "local/commands/devices/+" WITH DESTINATION_TOPIC "factory1/commands/devices/+" WITH DIRECTION "in" ADD MAPPING configUpdates WITH SOURCE_TOPIC "local/config/#" WITH DESTINATION_TOPIC "factory1/config/#" WITH DIRECTION "in"
Connect multiple factory locations to a central broker. This pattern enables enterprise-wide visibility while keeping local operations independent:
DEFINE ROUTE FactoryNetwork WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER SELF ADD DESTINATION_CONFIG WITH BROKER_ADDRESS "central-broker.company.com" WITH BROKER_PORT '8883' WITH CLIENT_ID "Factory-Chicago" WITH USE_TLS true ADD MAPPING productionData WITH SOURCE_TOPIC "production/#" WITH DESTINATION_TOPIC "chicago/production/#" WITH DIRECTION "out" ADD MAPPING qualityData WITH SOURCE_TOPIC "quality/#" WITH DESTINATION_TOPIC "chicago/quality/#" WITH DIRECTION "out" ADD MAPPING centralCommands WITH SOURCE_TOPIC "local/commands/#" WITH DESTINATION_TOPIC "chicago/commands/#" WITH DIRECTION "in"
Store sensor readings in MongoDB using the CLEAN query format. This example captures all sensor readings and stores them with metadata including the original topic and timestamp:
DEFINE ROUTE SensorStorage WITH TYPE MONGODB ADD MONGODB_CONFIG WITH CONNECTION_STRING "mongodb://user:password@mongodb.example.com:27017/iot?authSource=admin" WITH DATABASE "iot" ADD EVENT StoreSensorReading WITH QUERY "CLEAN:{collection: sensor_readings, document: {sensor_id: {sensor_id}, value: {value.json}, topic: {source_topic}, timestamp: {timestamp}}}" WITH SOURCE_TOPIC "sensors/+/reading"
Send emails for critical system alerts. Template placeholders like {value.json.equipment_id} are replaced with values from the MQTT payload:
DEFINE ROUTE CriticalAlerts WITH TYPE EMAIL ADD MQTT_CONFIG WITH BROKER SELF ADD SMTP_CONFIG WITH HOST "smtp.gmail.com" WITH PORT '587' WITH USERNAME "alerts@mycompany.com" WITH PASSWORD "gmail-app-password" WITH USE_TLS true ADD MAPPING criticalEquipment WITH SOURCE_TOPIC "alerts/critical/equipment/+" WITH TEMPLATE_PATH "/templates/equipment_alert.html" WITH SUBJECT "CRITICAL: Equipment {value.json.equipment_id} - {value.json.alert_type}" WITH RECIPIENT "ops-team@mycompany.com" ADD MAPPING criticalSafety WITH SOURCE_TOPIC "alerts/critical/safety/+" WITH TEMPLATE_PATH "/templates/safety_alert.html" WITH SUBJECT "SAFETY ALERT: {value.json.location}" WITH RECIPIENT "safety-team@mycompany.com"
Send to recipients specified in the MQTT payload using {value.json.email}:
DEFINE ROUTE UserNotifications WITH TYPE EMAIL ADD MQTT_CONFIG WITH BROKER SELF ADD SMTP_CONFIG WITH HOST "smtp.office365.com" WITH PORT '587' WITH USERNAME "noreply@company.com" WITH PASSWORD "office365-password" WITH USE_TLS true ADD MAPPING userAlert WITH SOURCE_TOPIC "users/+/notifications" WITH TEMPLATE_PATH "/templates/user_notification.html" WITH SUBJECT "Hello {value.json.username}, {value.json.notification_type}" WITH RECIPIENT "{value.json.email}"
Index all sensor data for real-time analytics and historical queries:
DEFINE ROUTE SensorArchive WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.company.com:9200" WITH USERNAME "sensor_writer" WITH PASSWORD "opensearch_password" WITH USE_SSL true ADD EVENT IndexSensorReading WITH QUERY "CLEAN:{index: sensor-readings, body: {@timestamp: {timestamp}, sensor_id: {sensor_id}, data: {value.json}}}" WITH SOURCE_TOPIC "sensors/+/data"
Bridge data from a remote broker and simultaneously store it in OpenSearch using the STORE keyword:
DEFINE ROUTE ProcessAndArchive WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER_ADDRESS "factory-broker.local" WITH BROKER_PORT '1883' WITH CLIENT_ID "ArchiveBridge" ADD DESTINATION_CONFIG WITH BROKER SELF ADD MAPPING sensorData WITH SOURCE_TOPIC "factory/sensors/+/data" WITH DESTINATION_TOPIC "archived/sensors/+/data" WITH DIRECTION "in" STORE SensorArchive WITH INDEX_NAME "factory_sensors"
This comprehensive example combines multiple route types into a complete IoT integration setup. It demonstrates how different routes work together:
// 1. MongoDB storage for historical dataDEFINE ROUTE DataWarehouse WITH TYPE MONGODB ADD MONGODB_CONFIG WITH CONNECTION_STRING "mongodb://user:pass@mongodb.company.com:27017/iot?ssl=true" WITH DATABASE "iot" ADD EVENT StoreProduction WITH QUERY "CLEAN:{collection: production_data, document: {timestamp: {timestamp}, data: {value.json}}}" WITH SOURCE_TOPIC "production/#"// 2. Cloud synchronization for remote monitoringDEFINE ROUTE CloudSync WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER SELF ADD DESTINATION_CONFIG WITH BROKER_ADDRESS "iot.cloud.com" WITH BROKER_PORT '8883' WITH CLIENT_ID "Factory-Main" WITH USE_TLS true ADD MAPPING allData WITH SOURCE_TOPIC "production/#" WITH DESTINATION_TOPIC "factory-main/production/#" WITH DIRECTION "out"// 3. REST API for external integrationsDEFINE ROUTE ERPIntegration WITH TYPE REST_API ADD REST_API_CONFIG WITH ENABLE_CLIENT "true" WITH BASE_ADDRESS "https://erp.company.com/api" ADD MAPPING UpdateInventory WITH SOURCE_TOPIC "production/completed" WITH DESTINATION_TOPIC "erp/response" WITH QUERY '{"method": "POST", "endpoint": "/inventory/update", "content": "{payload.json}"}'// 4. Email alerts for critical eventsDEFINE ROUTE CriticalAlerts WITH TYPE EMAIL ADD MQTT_CONFIG WITH BROKER SELF ADD SMTP_CONFIG WITH HOST "smtp.company.com" WITH PORT '587' WITH USERNAME "alerts@company.com" WITH PASSWORD "email_password" WITH USE_TLS true ADD MAPPING critical WITH SOURCE_TOPIC "alerts/critical/#" WITH TEMPLATE_PATH "/templates/critical.html" WITH SUBJECT "CRITICAL: {value.json.type}" WITH RECIPIENT "ops@company.com"