Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.coreflux.org/llms.txt

Use this file to discover all available pages before exploring further.

From Device to Database, End to End

Putting the patterns together, a full ingestion pipeline has four stages, each one a single LoT entity. This is the canonical recipe for “get data from a real device, normalize it, and store it for analytics.”
Like a kitchen production line. The delivery comes in (sensor), the chef cleans and preps (Action), the dish is plated to the same standard every time (Model), and the waiter takes it to the table (Database).

When to Reach for This

The first time you need persistent, queryable history of your data — usually somewhere between “it works on my laptop” and “we need to show this to the customer.” It’s also the natural foundation for Live KPIs from a Database and any reporting or analytics layer you’ll add later.

The Worked Example

A smart-building HVAC chiller, communicating via Modbus, normalized into a unified schema, and stored in PostgreSQL.

Stage 1 — Industrial Route Ingests from the Device

Pick the route type that matches your protocol: MODBUS for HVAC controllers, OPCUA for PLCs, S7 for Siemens, ALLEN_BRADLEY for Rockwell, BACNET for building automation. Map each register or tag to a UNS topic.
DEFINE ROUTE ChillerGateway WITH TYPE MODBUS
    ADD MODBUS_CONFIG
        WITH HOST GET ENV "CHILLER_HOST"
        WITH PORT '502'
        WITH UNIT_ID '1'
    ADD MAPPING coolingPower
        WITH REGISTER '40001'
        WITH DATA_TYPE "FLOAT32"
        WITH DESTINATION_TOPIC "acmecorp/headquarters/floor3/hvac/chiller-01/metrics/cooling_power"

Stage 2 — Action Processes and Normalizes

Type-cast, scale, validate, then call a Model to produce structured output. A wildcard trigger means the same Action handles every chiller in every building.
DEFINE ACTION NormalizeChillerReading
ON TOPIC "acmecorp/+/+/hvac/+/metrics/cooling_power" DO
    SET "building" WITH TOPIC POSITION 2
    SET "floor" WITH TOPIC POSITION 3
    SET "device_id" WITH TOPIC POSITION 5
    SET "raw_kw" WITH PAYLOAD AS DOUBLE

    PUBLISH MODEL ChillerReading TO "acmecorp/" + {building} + "/" + {floor} + "/hvac/" + {device_id} + "/processed" WITH
        device_id = {device_id}
        building = {building}
        cooling_power_kw = {raw_kw}
        timestamp = TIMESTAMP "UTC"

Stage 3 — Model Standardizes the Schema

One JSON shape for every chiller in every building. Add fields here once, every consumer downstream sees them.
DEFINE MODEL ChillerReading COLLAPSED
    ADD STRING "device_id"
    ADD STRING "building"
    ADD DOUBLE "cooling_power_kw"
    ADD STRING "timestamp"

Stage 4 — Database Route Persists

A single insert per published reading. Credentials come from ENV and SECRET so the route is portable between environments.
DEFINE ROUTE ChillerHistorian WITH TYPE POSTGRESQL
    ADD SQL_CONFIG
        WITH SERVER GET ENV "DB_HOST"
        WITH USERNAME GET ENV "DB_USER"
        WITH PASSWORD GET SECRET "DB_PASSWORD"
        WITH DATABASE "building_data"
    ADD EVENT StoreChillerReading
        WITH SOURCE_TOPIC "acmecorp/+/+/hvac/+/processed"
        WITH QUERY "INSERT INTO chiller_readings (ts, device_id, building, cooling_power_kw) VALUES ('{value.json.timestamp}', '{value.json.device_id}', '{value.json.building}', {value.json.cooling_power_kw})"
Four entities, one pipeline. Every chiller in every building flows the same way. Adding a new chiller means adding a Mapping in the gateway route — nothing else changes. Adding a new field to the schema means adding it to ChillerReading and updating the SQL — all the Actions stay untouched. The same four-stage recipe fits any vertical: a solar inverter via Modbus, a traffic light controller via REST, a delivery van via cellular MQTT. Different devices, different protocols, same shape.

Splitting Edge from Cloud with a Bridge

In a typical production setup, the edge broker runs on-site and handles all the high-frequency, device-facing work — every reading, every command, every alarm. The cloud broker sits centrally and only sees what the business needs to see — minute aggregates, alerts, cross-site rollups. An MQTT_BRIDGE route is what selectively forwards data between the two.
Like a local newspaper sending a daily summary to head office. The full story stays in the newsroom; only the headlines and totals make the trip to corporate. Cheaper, safer, faster.

When to Reach for This

ReasonWhat it gives you
Bandwidth and costRaw sensor data at 1 Hz from 100 devices is a lot. Aggregating to one summary per minute can shrink uplink traffic by 99% — and your cloud bill with it.
Edge processing speedLocal alarms react in milliseconds without waiting for a cloud round-trip. Even if the WAN goes down, the edge keeps running.
ResilienceIf the cloud connection drops, the edge buffers and reconnects automatically. Operations never stop because of a flaky uplink.
Data and security boundarySensitive raw data never leaves the site. Only the summarized, business-relevant information crosses the boundary.
Multi-site rollupsThe cloud sees every site through the same shape (lisbon-park/..., madrid-park/...), enabling cross-site analytics and dashboards without changing edge logic.
This is the right pattern when you have multiple sites, strict bandwidth or cost limits, regulatory or security requirements to keep raw data local, or devices that must keep operating during cloud outages. For a single-site project with a fast and cheap connection, a single broker is simpler.

What’s Where: A Concrete Example

Imagine an edge broker at a solar park with multiple inverters. Each inverter publishes voltage, current, and power readings several times a second. Here’s what each broker actually contains:
Topic on edge broker (at the park)What it isGoes to cloud?
inverters/inv-01/voltageRaw reading every second❌ Stays local
inverters/inv-01/currentRaw reading every second❌ Stays local
inverters/inv-01/powerRaw reading every second❌ Stays local
inverters/inv-02/powerRaw reading from another inverter❌ Stays local
state/inv-01/last_seenInternal state via KEEP TOPIC❌ Stays local
aggregates/inv-01/1minComputed every 60 seconds by an Action✅ Forwarded
aggregates/inv-02/1minSame, for the second inverter✅ Forwarded
alerts/inv-01/low_outputThreshold alarm✅ Forwarded
Topic on cloud broker (after bridge)Source
lisbon-park/aggregates/inv-01/1minForwarded from the Lisbon edge
lisbon-park/aggregates/inv-02/1minForwarded from the Lisbon edge
lisbon-park/alerts/inv-01/low_outputForwarded from the Lisbon edge
madrid-park/aggregates/inv-A/1minForwarded from a different site
(one prefix per park)
The cloud sees a clean, aggregated, multi-site view. The edge keeps everything raw, locally and privately.

Step 1: Create the Aggregate at the Edge

An Action on the edge computes a summary every minute, per inverter. This is what gets sent up.
DEFINE ACTION PublishMinuteAggregate
ON EVERY 60 SECONDS DO
    SET "inverter_id" WITH "inv-01"
    SET "avg_power" WITH (GET TOPIC "cache/" + {inverter_id} + "/power_avg" AS DOUBLE)
    SET "peak_power" WITH (GET TOPIC "cache/" + {inverter_id} + "/power_peak" AS DOUBLE)

    PUBLISH MODEL InverterAggregate TO "aggregates/" + {inverter_id} + "/1min" WITH
        inverter_id = {inverter_id}
        avg_power_kw = {avg_power}
        peak_power_kw = {peak_power}
        timestamp = TIMESTAMP "UTC"
(The cache/ topics get filled by other Actions that consume the raw inverters/ stream — kept internal to the edge using KEEP TOPIC.)

Step 2: Define the Bridge

Three topic mappings. Aggregates and alerts go up; commands come down. Everything else stays local.
DEFINE ROUTE EdgeToCloud WITH TYPE MQTT_BRIDGE
    ADD SOURCE_CONFIG
        WITH BROKER SELF
    ADD DESTINATION_CONFIG
        WITH BROKER_ADDRESS GET ENV "CLOUD_BROKER_HOST"
        WITH BROKER_PORT '8883'
        WITH CLIENT_ID GET ENV "EDGE_SITE_ID"
        WITH USERNAME GET ENV "CLOUD_BROKER_USER"
        WITH PASSWORD GET SECRET "CLOUD_BROKER_PASSWORD"
        WITH USE_TLS "true"
    ADD MAPPING aggregateUplink
        WITH SOURCE_TOPIC "aggregates/#"
        WITH DESTINATION_TOPIC "lisbon-park/aggregates/#"
        WITH DIRECTION "out"
    ADD MAPPING alertUplink
        WITH SOURCE_TOPIC "alerts/#"
        WITH DESTINATION_TOPIC "lisbon-park/alerts/#"
        WITH DIRECTION "out"
    ADD MAPPING commandsDownlink
        WITH SOURCE_TOPIC "lisbon-park/commands/#"
        WITH DESTINATION_TOPIC "commands/#"
        WITH DIRECTION "in"
The site prefix (lisbon-park) is added on the cloud side, so multiple parks — or buildings, or cities, or fleet regions — can coexist on the same cloud broker without topic collisions.

Complementary Patterns on the Cloud

Once the cloud is receiving aggregates and alerts as MQTT topics, you can compose more patterns on top of it:
  • Cloud database route for central analytics. Add a PostgreSQL or CrateDB route on the cloud broker that subscribes to +/aggregates/# and inserts into a multi-site table. Now you have one historian with data from every site, queryable for trends, capacity planning, and cross-park benchmarks. The exact same EVENT ... QUERY pattern from the Live KPIs from a Database section works here.
  • Cloud dashboards. Coreflux HUB Dashboards on the cloud broker subscribe to +/aggregates/# and +/alerts/# to give corporate or operations teams a unified view of every site.
  • Cross-site KPIs. A scheduled EVENT on the cloud’s database can publish company-wide KPIs (SELECT AVG(...) GROUP BY site_id) as MQTT topics, just like a single-site KPI — but rolled up across the whole portfolio.
  • AI / MCP routes on the cloud. Plug an AGENT route into the cloud broker so an AI assistant can answer questions about the entire fleet without ever needing to touch the edge.
The bridge is the boundary; everything above it is business; everything below it is operations. Each side can evolve independently.

Next Steps

Data Storage Routes

Historians, inserts, and scheduled queries across database types.

Connecting to the outside world

REST, KPIs, and credentials from this guide.
Last modified on May 20, 2026