Putting the patterns together, a full ingestion pipeline has four stages, each one a single LoT entity. This is the canonical recipe for “get data from a real device, normalize it, and store it for analytics.”
Like a kitchen production line. The delivery comes in (sensor), the chef cleans and preps (Action), the dish is plated to the same standard every time (Model), and the waiter takes it to the table (Database).
The first time you need persistent, queryable history of your data — usually somewhere between “it works on my laptop” and “we need to show this to the customer.” It’s also the natural foundation for Live KPIs from a Database and any reporting or analytics layer you’ll add later.
Stage 1 — Industrial Route Ingests from the Device
Pick the route type that matches your protocol: MODBUS for HVAC controllers, OPCUA for PLCs, S7 for Siemens, ALLEN_BRADLEY for Rockwell, BACNET for building automation. Map each register or tag to a UNS topic.
DEFINE ROUTE ChillerGateway WITH TYPE MODBUS ADD MODBUS_CONFIG WITH HOST GET ENV "CHILLER_HOST" WITH PORT '502' WITH UNIT_ID '1' ADD MAPPING coolingPower WITH REGISTER '40001' WITH DATA_TYPE "FLOAT32" WITH DESTINATION_TOPIC "acmecorp/headquarters/floor3/hvac/chiller-01/metrics/cooling_power"
Type-cast, scale, validate, then call a Model to produce structured output. A wildcard trigger means the same Action handles every chiller in every building.
DEFINE ACTION NormalizeChillerReadingON TOPIC "acmecorp/+/+/hvac/+/metrics/cooling_power" DO SET "building" WITH TOPIC POSITION 2 SET "floor" WITH TOPIC POSITION 3 SET "device_id" WITH TOPIC POSITION 5 SET "raw_kw" WITH PAYLOAD AS DOUBLE PUBLISH MODEL ChillerReading TO "acmecorp/" + {building} + "/" + {floor} + "/hvac/" + {device_id} + "/processed" WITH device_id = {device_id} building = {building} cooling_power_kw = {raw_kw} timestamp = TIMESTAMP "UTC"
A single insert per published reading. Credentials come from ENV and SECRET so the route is portable between environments.
DEFINE ROUTE ChillerHistorian WITH TYPE POSTGRESQL ADD SQL_CONFIG WITH SERVER GET ENV "DB_HOST" WITH USERNAME GET ENV "DB_USER" WITH PASSWORD GET SECRET "DB_PASSWORD" WITH DATABASE "building_data" ADD EVENT StoreChillerReading WITH SOURCE_TOPIC "acmecorp/+/+/hvac/+/processed" WITH QUERY "INSERT INTO chiller_readings (ts, device_id, building, cooling_power_kw) VALUES ('{value.json.timestamp}', '{value.json.device_id}', '{value.json.building}', {value.json.cooling_power_kw})"
Four entities, one pipeline. Every chiller in every building flows the same way. Adding a new chiller means adding a Mapping in the gateway route — nothing else changes. Adding a new field to the schema means adding it to ChillerReading and updating the SQL — all the Actions stay untouched.The same four-stage recipe fits any vertical: a solar inverter via Modbus, a traffic light controller via REST, a delivery van via cellular MQTT. Different devices, different protocols, same shape.
In a typical production setup, the edge broker runs on-site and handles all the high-frequency, device-facing work — every reading, every command, every alarm. The cloud broker sits centrally and only sees what the business needs to see — minute aggregates, alerts, cross-site rollups. An MQTT_BRIDGE route is what selectively forwards data between the two.
Like a local newspaper sending a daily summary to head office. The full story stays in the newsroom; only the headlines and totals make the trip to corporate. Cheaper, safer, faster.
Raw sensor data at 1 Hz from 100 devices is a lot. Aggregating to one summary per minute can shrink uplink traffic by 99% — and your cloud bill with it.
Edge processing speed
Local alarms react in milliseconds without waiting for a cloud round-trip. Even if the WAN goes down, the edge keeps running.
Resilience
If the cloud connection drops, the edge buffers and reconnects automatically. Operations never stop because of a flaky uplink.
Data and security boundary
Sensitive raw data never leaves the site. Only the summarized, business-relevant information crosses the boundary.
Multi-site rollups
The cloud sees every site through the same shape (lisbon-park/..., madrid-park/...), enabling cross-site analytics and dashboards without changing edge logic.
This is the right pattern when you have multiple sites, strict bandwidth or cost limits, regulatory or security requirements to keep raw data local, or devices that must keep operating during cloud outages. For a single-site project with a fast and cheap connection, a single broker is simpler.
Imagine an edge broker at a solar park with multiple inverters. Each inverter publishes voltage, current, and power readings several times a second. Here’s what each broker actually contains:
Topic on edge broker (at the park)
What it is
Goes to cloud?
inverters/inv-01/voltage
Raw reading every second
❌ Stays local
inverters/inv-01/current
Raw reading every second
❌ Stays local
inverters/inv-01/power
Raw reading every second
❌ Stays local
inverters/inv-02/power
Raw reading from another inverter
❌ Stays local
state/inv-01/last_seen
Internal state via KEEP TOPIC
❌ Stays local
aggregates/inv-01/1min
Computed every 60 seconds by an Action
✅ Forwarded
aggregates/inv-02/1min
Same, for the second inverter
✅ Forwarded
alerts/inv-01/low_output
Threshold alarm
✅ Forwarded
Topic on cloud broker (after bridge)
Source
lisbon-park/aggregates/inv-01/1min
Forwarded from the Lisbon edge
lisbon-park/aggregates/inv-02/1min
Forwarded from the Lisbon edge
lisbon-park/alerts/inv-01/low_output
Forwarded from the Lisbon edge
madrid-park/aggregates/inv-A/1min
Forwarded from a different site
…
(one prefix per park)
The cloud sees a clean, aggregated, multi-site view. The edge keeps everything raw, locally and privately.
An Action on the edge computes a summary every minute, per inverter. This is what gets sent up.
DEFINE ACTION PublishMinuteAggregateON EVERY 60 SECONDS DO SET "inverter_id" WITH "inv-01" SET "avg_power" WITH (GET TOPIC "cache/" + {inverter_id} + "/power_avg" AS DOUBLE) SET "peak_power" WITH (GET TOPIC "cache/" + {inverter_id} + "/power_peak" AS DOUBLE) PUBLISH MODEL InverterAggregate TO "aggregates/" + {inverter_id} + "/1min" WITH inverter_id = {inverter_id} avg_power_kw = {avg_power} peak_power_kw = {peak_power} timestamp = TIMESTAMP "UTC"
(The cache/ topics get filled by other Actions that consume the raw inverters/ stream — kept internal to the edge using KEEP TOPIC.)
Three topic mappings. Aggregates and alerts go up; commands come down. Everything else stays local.
DEFINE ROUTE EdgeToCloud WITH TYPE MQTT_BRIDGE ADD SOURCE_CONFIG WITH BROKER SELF ADD DESTINATION_CONFIG WITH BROKER_ADDRESS GET ENV "CLOUD_BROKER_HOST" WITH BROKER_PORT '8883' WITH CLIENT_ID GET ENV "EDGE_SITE_ID" WITH USERNAME GET ENV "CLOUD_BROKER_USER" WITH PASSWORD GET SECRET "CLOUD_BROKER_PASSWORD" WITH USE_TLS "true" ADD MAPPING aggregateUplink WITH SOURCE_TOPIC "aggregates/#" WITH DESTINATION_TOPIC "lisbon-park/aggregates/#" WITH DIRECTION "out" ADD MAPPING alertUplink WITH SOURCE_TOPIC "alerts/#" WITH DESTINATION_TOPIC "lisbon-park/alerts/#" WITH DIRECTION "out" ADD MAPPING commandsDownlink WITH SOURCE_TOPIC "lisbon-park/commands/#" WITH DESTINATION_TOPIC "commands/#" WITH DIRECTION "in"
The site prefix (lisbon-park) is added on the cloud side, so multiple parks — or buildings, or cities, or fleet regions — can coexist on the same cloud broker without topic collisions.
Once the cloud is receiving aggregates and alerts as MQTT topics, you can compose more patterns on top of it:
Cloud database route for central analytics. Add a PostgreSQL or CrateDB route on the cloud broker that subscribes to +/aggregates/# and inserts into a multi-site table. Now you have one historian with data from every site, queryable for trends, capacity planning, and cross-park benchmarks. The exact same EVENT ... QUERY pattern from the Live KPIs from a Database section works here.
Cloud dashboards. External visualization tools (or MQTT clients) subscribed to +/aggregates/# and +/alerts/# on a cloud broker give corporate or operations teams a unified view of every site.
Cross-site KPIs. A scheduled EVENT on the cloud’s database can publish company-wide KPIs (SELECT AVG(...) GROUP BY site_id) as MQTT topics, just like a single-site KPI — but rolled up across the whole portfolio.
AI / MCP routes on the cloud. Plug an AGENT route into the cloud broker so an AI assistant can answer questions about the entire fleet without ever needing to touch the edge.
The bridge is the boundary; everything above it is business; everything below it is operations. Each side can evolve independently.