Skip to main content

Why Define a Schema?

Models transform scattered MQTT data into structured, predictable JSON. Instead of parsing raw payloads in your application, you define once how data should look—and Coreflux handles the rest.
Like a form that fills itself out. You define what fields the form should have (temperature, humidity, timestamp), and the model automatically grabs the latest values from different sources whenever it needs to publish.

When to Use This Reference

Use this page when you need to:
  • Define field types for your model (STRING, INT, DOUBLE, etc.)
  • Pull values from multiple MQTT topics into a single output
  • Calculate or transform values inline
  • Control when your model publishes using triggers

In This Page

  • Basic Syntax — The structure every model follows
  • Trigger Mechanism — How to control when your model publishes
  • Field Types — STRING, INT, DOUBLE, BOOL, OBJECT, ARRAY
  • Data Sources — Static values, topic data, timestamps, calculations
  • Wildcard Topics — Multi-instance models
  • Complete Examples — Real-world patterns

Basic Model Syntax

Every model follows this structure:
DEFINE MODEL <ModelName> WITH TOPIC "<output/topic>"
    ADD <FIELD_TYPE> "<field_name>" WITH <data_source> [AS TRIGGER]
    ADD <FIELD_TYPE> "<field_name>" WITH <data_source>
    ...

Components

ModelName
string
required
Unique identifier for the model. Use descriptive names like SensorReading, EquipmentStatus, ProductionRecord.
WITH TOPIC
string
required
The MQTT topic where the model’s JSON output will be published. Supports wildcards for multi-instance models.
FIELD_TYPE
type
required
One of: STRING, INT, DOUBLE, BOOL, OBJECT, ARRAY
AS TRIGGER
modifier
Optional. Marks this field as the trigger—the model publishes only when this field’s source topic updates.

Trigger Mechanism

The AS TRIGGER modifier controls when your model publishes. Without it, you’d flood your broker with updates every time any source topic changes.
Like a doorbell that takes a photo. The model waits quietly until the trigger topic “rings,” then takes a snapshot of all fields and publishes the complete picture. No ring, no photo.
DEFINE MODEL TriggeredModel WITH TOPIC "output/triggered"
    ADD STRING "equipment_id" WITH TOPIC "equipment/id"
    ADD STRING "status" WITH TOPIC "equipment/status" AS TRIGGER
    ADD INT "runtime" WITH TOPIC "equipment/runtime"
    ADD STRING "timestamp" WITH TIMESTAMP "UTC"
Behavior:
  • Model publishes only when equipment/status changes
  • Changes to equipment/id or equipment/runtime alone do not trigger publication
  • All field values are fetched at trigger time

No Trigger Specified

If no field has AS TRIGGER, the model acts as a template. You must publish it explicitly using PUBLISH MODEL from an Action.
Learn how to publish models on-demand in Publishing Models.

Multiple Data Sources, Single Trigger

You can pull data from many topics but trigger on just one:
DEFINE MODEL MultiSourceModel WITH TOPIC "output/multi"
    ADD STRING "sensor_a" WITH TOPIC "sensors/a/value"
    ADD STRING "sensor_b" WITH TOPIC "sensors/b/value"
    ADD STRING "sensor_c" WITH TOPIC "sensors/c/value" AS TRIGGER
    ADD STRING "timestamp" WITH TIMESTAMP "UTC"
Only publishes when sensor C updates, but includes current values from sensors A and B.

Field Types

Each field in your model has a type that determines how the value is serialized to JSON.

STRING

Text data of any length:
ADD STRING "sensor_id" WITH "TEMP001"
ADD STRING "status" WITH TOPIC "equipment/status"
ADD STRING "timestamp" WITH TIMESTAMP "UTC"

INT

Whole numbers (integers):
ADD INT "count" WITH 0
ADD INT "runtime_hours" WITH TOPIC "equipment/runtime"
ADD INT "total" WITH (GET TOPIC "passed" + GET TOPIC "failed")

DOUBLE

Decimal numbers (floating-point):
ADD DOUBLE "temperature" WITH 23.5
ADD DOUBLE "efficiency" WITH TOPIC "metrics/efficiency"
ADD DOUBLE "percentage" WITH (GET TOPIC "actual" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100)

BOOL

Boolean values (true/false):
ADD BOOL "enabled" WITH TRUE
ADD BOOL "maintenance_required" WITH TOPIC "equipment/needs_maintenance"
ADD BOOL "is_valid" WITH (GET TOPIC "value" > 0)

OBJECT

Nested JSON structures:
ADD OBJECT "metadata" WITH {
    "location": TOPIC "equipment/location",
    "installation_date": "2025-01-01",
    "firmware_version": "v2.1"
}

ARRAY

JSON arrays:
ADD ARRAY "readings" WITH TOPIC "sensors/batch/readings"
ADD ARRAY "tags" WITH ["production", "line1", "active"]

Data Sources

Fields can pull values from several sources. Mix and match as needed.

Static Values

Fixed values that never change—useful for metadata, units, or default settings:
DEFINE MODEL StaticExample WITH TOPIC "output/static"
    ADD STRING "unit" WITH "celsius"
    ADD INT "default_threshold" WITH 100
    ADD DOUBLE "conversion_factor" WITH 1.8
    ADD BOOL "enabled" WITH TRUE
    ADD STRING "location" WITH "Factory Floor A"

Topic Data

Pull live values from other MQTT topics. The value is fetched at publish time, not when the model is defined.
DEFINE MODEL TopicExample WITH TOPIC "output/dynamic"
    ADD DOUBLE "temperature" WITH TOPIC "sensors/raw/temperature"
    ADD STRING "status" WITH TOPIC "equipment/current/status" AS TRIGGER
    ADD INT "count" WITH TOPIC "production/count"
Example: If sensors/raw/temperature currently holds 23.5, the model’s temperature field will be 23.5 when it publishes. If the value changes to 24.0 before the next publish, the model will output 24.0.

Timestamps

Built-in timestamp generation—no external time source needed:
ADD STRING "timestamp" WITH TIMESTAMP "UTC"      // "2025-10-25T14:30:15Z"
ADD STRING "iso_time" WITH TIMESTAMP "ISO"       // ISO 8601 format
ADD INT "unix_time" WITH TIMESTAMP "UNIX"        // 1729864215
ADD INT "unix_ms" WITH TIMESTAMP "UNIX-MS"       // Milliseconds since epoch

Calculated Values

Perform inline math using topic data:
DEFINE MODEL CalculatedExample WITH TOPIC "output/calculated"
    ADD DOUBLE "efficiency" WITH (GET TOPIC "produced" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100)
    ADD INT "total" WITH (GET TOPIC "passed" + GET TOPIC "failed")
    ADD DOUBLE "fahrenheit" WITH (GET TOPIC "celsius" AS DOUBLE * 9 / 5 + 32)
    ADD DOUBLE "average" WITH ((GET TOPIC "val1" + GET TOPIC "val2" + GET TOPIC "val3") / 3)
Always use AS DOUBLE or AS INT when performing mathematical operations to ensure correct type handling.

Conditional Values

Dynamic values based on conditions:
DEFINE MODEL ConditionalExample WITH TOPIC "output/conditional"
    ADD STRING "status" WITH IF (GET TOPIC "value" > 80) THEN "HIGH" ELSE "NORMAL"
    ADD STRING "quality" WITH IF (GET TOPIC "defects" = 0) THEN "PASS" ELSE "FAIL"
    ADD INT "priority" WITH IF (GET TOPIC "severity" EQUALS "CRITICAL") THEN 1 
                            ELSE IF (GET TOPIC "severity" EQUALS "HIGH") THEN 2 
                            ELSE 3

Wildcard Topics

Multi-Instance Models

Use + wildcards to create models that handle multiple instances automatically:
DEFINE MODEL MultiInstanceModel WITH TOPIC "sensors/+/formatted"
Matches:
  • sensors/temp001/formatted
  • sensors/pressure002/formatted
  • sensors/humidity003/formatted
Each unique topic creates a separate model instance.

JSON Reception with Wildcards

When JSON arrives at a wildcard topic, the broker “explodes” it into individual topics:
DEFINE MODEL MaterialData WITH TOPIC "Process/+/+/Material/Content"
When JSON arrives at Process/Machine/33/Material/Content:
{"MaterialID": 132323, "Name": "Peppers", "Quantity": 21}
Explodes to:
  • Process/Machine/33/Material/Content/MaterialID132323
  • Process/Machine/33/Material/Content/NamePeppers
  • Process/Machine/33/Material/Content/Quantity21

Complete Examples

This model aggregates equipment data from multiple topics into a single, structured output. It triggers whenever the status changes.
DEFINE MODEL EquipmentStatus WITH TOPIC "equipment/status/formatted"
    ADD STRING "equipment_id" WITH TOPIC "equipment/current/id"
    ADD STRING "status" WITH TOPIC "equipment/current/status" AS TRIGGER
    ADD INT "runtime_hours" WITH TOPIC "equipment/current/runtime"
    ADD DOUBLE "efficiency" WITH TOPIC "equipment/current/efficiency"
    ADD BOOL "maintenance_required" WITH TOPIC "equipment/current/maintenance_flag"
    ADD STRING "last_update" WITH TIMESTAMP "UTC"
    ADD STRING "location" WITH "Factory Floor A"
Output:
{
  "equipment_id": "PUMP001",
  "status": "RUNNING",
  "runtime_hours": 1250,
  "efficiency": 87.5,
  "maintenance_required": false,
  "last_update": "2025-10-25T14:30:15Z",
  "location": "Factory Floor A"
}

Best Practices

Establish naming conventions and follow them across all models:
// Good - consistent across models
ADD STRING "timestamp"
ADD STRING "sensor_id"
ADD STRING "equipment_id"

// Avoid - inconsistent
ADD STRING "time_stamp"
ADD STRING "sensorID"
ADD STRING "equip_ID"
Explicit type casting prevents errors and ensures predictable results:
// Good
ADD DOUBLE "efficiency" WITH (GET TOPIC "produced" AS DOUBLE / GET TOPIC "target" AS DOUBLE * 100)

// Risky - implicit type handling
ADD DOUBLE "efficiency" WITH (GET TOPIC "produced" / GET TOPIC "target" * 100)
Always include fields that help downstream consumers understand the data:
  • Identification fields (IDs, names)
  • Timestamps
  • Units of measurement
  • Status or quality indicators
Trigger on the primary data field, not metadata:
// Good - trigger on the main measurement
ADD DOUBLE "value" WITH TOPIC "sensors/temperature" AS TRIGGER

// Bad - triggers too frequently or unpredictably
ADD STRING "timestamp" WITH TIMESTAMP "UTC" AS TRIGGER

Next Steps