Models transform scattered MQTT data into structured, predictable JSON. Instead of parsing raw payloads in your application, you define once how data should look—and Coreflux handles the rest.
Like a form that fills itself out. You define what fields the form should have (temperature, humidity, timestamp), and the model automatically grabs the latest values from different sources whenever it needs to publish.
DEFINE MODEL <ModelName> WITH TOPIC "<output/topic>" ADD <FIELD_TYPE> "<field_name>" WITH <data_source> [AS TRIGGER] ADD <FIELD_TYPE> "<field_name>" WITH <data_source> ...
The AS TRIGGER modifier controls when your model publishes. Without it, you’d flood your broker with updates every time any source topic changes.
Like a doorbell that takes a photo. The model waits quietly until the trigger topic “rings,” then takes a snapshot of all fields and publishes the complete picture. No ring, no photo.
Copy
Ask AI
DEFINE MODEL TriggeredModel WITH TOPIC "output/triggered" ADD STRING "equipment_id" WITH TOPIC "equipment/id" ADD STRING "status" WITH TOPIC "equipment/status" AS TRIGGER ADD INT "runtime" WITH TOPIC "equipment/runtime" ADD STRING "timestamp" WITH TIMESTAMP "UTC"
Behavior:
Model publishes only when equipment/status changes
Changes to equipment/id or equipment/runtime alone do not trigger publication
You can pull data from many topics but trigger on just one:
Copy
Ask AI
DEFINE MODEL MultiSourceModel WITH TOPIC "output/multi" ADD STRING "sensor_a" WITH TOPIC "sensors/a/value" ADD STRING "sensor_b" WITH TOPIC "sensors/b/value" ADD STRING "sensor_c" WITH TOPIC "sensors/c/value" AS TRIGGER ADD STRING "timestamp" WITH TIMESTAMP "UTC"
Only publishes when sensor C updates, but includes current values from sensors A and B.
ADD DOUBLE "temperature" WITH 23.5ADD DOUBLE "efficiency" WITH TOPIC "metrics/efficiency"ADD DOUBLE "percentage" WITH (GET TOPIC "actual" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100)
Fixed values that never change—useful for metadata, units, or default settings:
Copy
Ask AI
DEFINE MODEL StaticExample WITH TOPIC "output/static" ADD STRING "unit" WITH "celsius" ADD INT "default_threshold" WITH 100 ADD DOUBLE "conversion_factor" WITH 1.8 ADD BOOL "enabled" WITH TRUE ADD STRING "location" WITH "Factory Floor A"
Pull live values from other MQTT topics. The value is fetched at publish time, not when the model is defined.
Copy
Ask AI
DEFINE MODEL TopicExample WITH TOPIC "output/dynamic" ADD DOUBLE "temperature" WITH TOPIC "sensors/raw/temperature" ADD STRING "status" WITH TOPIC "equipment/current/status" AS TRIGGER ADD INT "count" WITH TOPIC "production/count"
Example: If sensors/raw/temperature currently holds 23.5, the model’s temperature field will be 23.5 when it publishes. If the value changes to 24.0 before the next publish, the model will output 24.0.
Built-in timestamp generation—no external time source needed:
Copy
Ask AI
ADD STRING "timestamp" WITH TIMESTAMP "UTC" // "2025-10-25T14:30:15Z"ADD STRING "iso_time" WITH TIMESTAMP "ISO" // ISO 8601 formatADD INT "unix_time" WITH TIMESTAMP "UNIX" // 1729864215ADD INT "unix_ms" WITH TIMESTAMP "UNIX-MS" // Milliseconds since epoch
DEFINE MODEL CalculatedExample WITH TOPIC "output/calculated" ADD DOUBLE "efficiency" WITH (GET TOPIC "produced" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100) ADD INT "total" WITH (GET TOPIC "passed" + GET TOPIC "failed") ADD DOUBLE "fahrenheit" WITH (GET TOPIC "celsius" AS DOUBLE * 9 / 5 + 32) ADD DOUBLE "average" WITH ((GET TOPIC "val1" + GET TOPIC "val2" + GET TOPIC "val3") / 3)
Always use AS DOUBLE or AS INT when performing mathematical operations to ensure correct type handling.
DEFINE MODEL ConditionalExample WITH TOPIC "output/conditional" ADD STRING "status" WITH IF (GET TOPIC "value" > 80) THEN "HIGH" ELSE "NORMAL" ADD STRING "quality" WITH IF (GET TOPIC "defects" = 0) THEN "PASS" ELSE "FAIL" ADD INT "priority" WITH IF (GET TOPIC "severity" EQUALS "CRITICAL") THEN 1 ELSE IF (GET TOPIC "severity" EQUALS "HIGH") THEN 2 ELSE 3
This model aggregates equipment data from multiple topics into a single, structured output. It triggers whenever the status changes.
Copy
Ask AI
DEFINE MODEL EquipmentStatus WITH TOPIC "equipment/status/formatted" ADD STRING "equipment_id" WITH TOPIC "equipment/current/id" ADD STRING "status" WITH TOPIC "equipment/current/status" AS TRIGGER ADD INT "runtime_hours" WITH TOPIC "equipment/current/runtime" ADD DOUBLE "efficiency" WITH TOPIC "equipment/current/efficiency" ADD BOOL "maintenance_required" WITH TOPIC "equipment/current/maintenance_flag" ADD STRING "last_update" WITH TIMESTAMP "UTC" ADD STRING "location" WITH "Factory Floor A"
This model calculates efficiency metrics inline. It triggers when the quantity updates, capturing a snapshot of the production run.
Copy
Ask AI
DEFINE MODEL ProductionRecord WITH TOPIC "production/records/completed" ADD STRING "batch_id" WITH TOPIC "production/current/batch_id" ADD STRING "product_code" WITH TOPIC "production/current/product_code" ADD INT "quantity_produced" WITH TOPIC "production/current/quantity" AS TRIGGER ADD INT "quantity_target" WITH TOPIC "production/current/target" ADD DOUBLE "efficiency_percent" WITH (GET TOPIC "production/current/quantity" AS DOUBLE / GET TOPIC "production/current/target" AS DOUBLE * 100) ADD STRING "operator" WITH TOPIC "production/current/operator" ADD STRING "completion_time" WITH TIMESTAMP "UTC"
Explicit type casting prevents errors and ensures predictable results:
Copy
Ask AI
// GoodADD DOUBLE "efficiency" WITH (GET TOPIC "produced" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100)// Risky - implicit type handlingADD DOUBLE "efficiency" WITH (GET TOPIC "produced" / GET TOPIC "target" * 100)
Include Essential Context
Always include fields that help downstream consumers understand the data:
Identification fields (IDs, names)
Timestamps
Units of measurement
Status or quality indicators
Choose Triggers Carefully
Trigger on the primary data field, not metadata:
Copy
Ask AI
// Good - trigger on the main measurementADD DOUBLE "value" WITH TOPIC "sensors/temperature" AS TRIGGER// Bad - triggers too frequently or unpredictablyADD STRING "timestamp" WITH TIMESTAMP "UTC" AS TRIGGER