Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.coreflux.org/llms.txt

Use this file to discover all available pages before exploring further.

Reacting to Threshold Crossings

Most alarm logic boils down to “if a value crosses a threshold, alert someone.” LoT does this in a handful of lines: read the current value, read the threshold from config, compare, publish if exceeded.
Like a kitchen oven timer. It does one thing, watches one number, and beeps when the number’s wrong. Quiet the rest of the time.

When to Reach for This

Any time you need to detect that something has gone outside an acceptable range — temperature too high, tank too full, vibration too strong, fill level too low, pollution above a limit. This is one of the first patterns most projects need.

The Simple Version

A CO2 monitor in a conference room:
DEFINE ACTION HighCO2Alarm
ON TOPIC "sensors/co2" DO
    SET "current" WITH PAYLOAD AS DOUBLE
    SET "max" WITH (GET TOPIC "config/co2_max" AS DOUBLE)

    IF {current} > {max} THEN
        PUBLISH TOPIC "alerts/co2_high"
            WITH ("CO2 too high: " + {current})
That’s it. Three things to notice:
  1. The threshold lives in config/, not in the Action. A facilities manager can change it at runtime without redeploying anything. If the limit shifts from 1000 ppm to 1200 ppm, just publish the new value to config/co2_max and the next reading uses the new limit.
  2. The alert topic is separate from the data topic. Anyone subscribing to alerts/# gets every alert in the system, regardless of source. Dashboards, notifiers, and on-call systems all use the same channel.
  3. No state tracking in this version. The Action fires every time a reading exceeds the threshold, so you’ll get repeated alerts while CO2 stays high. If you have downstream notification infrastructure that handles deduplication, that’s fine. If you don’t, see the state-tracking variant below.

Optional: Standardize the Alarm Format with a Model

Plain text alerts work, but as your system grows you’ll want consistent structure — every alarm in the same JSON shape so dashboards and databases can parse them uniformly. Define a COLLAPSED Model and use PUBLISH MODEL instead of PUBLISH TOPIC. (COLLAPSED means the Model doesn’t publish on its own — it only fires when something explicitly calls PUBLISH MODEL. See the Models reference for the full options.)
DEFINE MODEL Alarm COLLAPSED
    ADD STRING "type"
    ADD STRING "severity"
    ADD DOUBLE "value"
    ADD DOUBLE "threshold"
    ADD STRING "timestamp"

DEFINE ACTION HighCO2Alarm
ON TOPIC "sensors/co2" DO
    SET "current" WITH PAYLOAD AS DOUBLE
    SET "max" WITH (GET TOPIC "config/co2_max" AS DOUBLE)

    IF {current} > {max} THEN
        PUBLISH MODEL Alarm TO "alerts/co2_high" WITH
            type = "CO2_HIGH"
            severity = "WARNING"
            value = {current}
            threshold = {max}
            timestamp = TIMESTAMP "UTC"
Now every alarm — whether it’s high CO2, low water level, or door left open — publishes the same JSON shape. A single database table, dashboard panel, or notification rule can handle all of them. This pairs especially well with model inheritance.

Optional: Deduplicate with State Tracking

If you don’t have a downstream notifier that deduplicates — or you want a clean audit trail of “alarm started” and “alarm cleared” events — track the previous state in state/ and only publish on transitions.
DEFINE ACTION HighCO2Alarm
ON TOPIC "sensors/co2" DO
    SET "current" WITH PAYLOAD AS DOUBLE
    SET "max" WITH (GET TOPIC "config/co2_max" AS DOUBLE)
    SET "previous" WITH (GET TOPIC "state/co2_alarm")

    IF {current} > {max} THEN
        IF {previous} != "active" THEN
            PUBLISH TOPIC "alerts/co2_high"
                WITH ("CO2 too high: " + {current})
            KEEP TOPIC "state/co2_alarm" WITH "active"
    ELSE
        IF {previous} EQUALS "active" THEN
            PUBLISH TOPIC "alerts/co2_normal"
                WITH ("CO2 back to normal: " + {current})
            KEEP TOPIC "state/co2_alarm" WITH "normal"
What’s happening:
  • The action reads three things: current value, threshold, and the previous alarm state stored in state/co2_alarm.
  • If CO2 is over the threshold and the alarm wasn’t already active, it fires a “high” alert and marks the state active. Subsequent over-threshold readings while still active produce no extra alerts.
  • If CO2 is back below threshold and the alarm was active, it fires a “normal” alert and marks the state normal.
The state/ topic uses KEEP TOPIC (not PUBLISH TOPIC) so it’s internal to the alarm logic — retained on the broker, but not noise on the wire.

Optional: Send the Alarm by Email

For alarms that need to reach humans directly, add an EMAIL route that subscribes to your alert topics and sends a templated message via SMTP. The route does all the work — your Action just publishes to the alert topic as before. (Full reference: Data Pipeline Routes.)
DEFINE ROUTE AlertEmail WITH TYPE EMAIL
    ADD SMTP_CONFIG
        WITH HOST GET ENV "SMTP_HOST"
        WITH PORT '587'
        WITH USE_TLS "true"
        WITH USERNAME GET ENV "SMTP_USER"
        WITH PASSWORD GET SECRET "SMTP_PASSWORD"
    ADD EVENT NotifyOnHighCO2
        WITH SOURCE_TOPIC "alerts/co2_high"
        WITH SUBJECT "CO2 alarm: {value.json.value} ppm"
        WITH RECIPIENT "facilities@acmecorp.com"
        WITH TEMPLATE_PATH "/templates/alarm_email.html"
How it fits together:
  1. The threshold Action publishes a structured alarm to alerts/co2_high (using the Model variant from above).
  2. The email route is subscribed to that topic and fires automatically whenever a new alarm appears.
  3. Placeholders like {value.json.value} pull fields out of the JSON payload to build the email subject and body.
Credentials come from ENV and SECRET (see Keeping Credentials Out of Your Code). Pair with the state tracking variant if you want only one email per alarm transition rather than one per reading.

Knowing When a Device or Route Goes Offline

Two different things can go silent in a Coreflux system, and you want to know about both. Devices are the sensors, machines, and gateways out in the world. They go offline when their network drops, their battery dies, or they crash. The signal that something’s wrong is the absence of incoming messages. Routes are the broker’s own connections to databases, cloud brokers, REST APIs, and other external systems. They go offline when credentials change, the remote system is unreachable, or the network between them breaks. The signal here is the absence of an outbound connection, which the broker tracks itself using SYS topics (the built-in SYS hierarchy). Both need monitoring. The data sources are different, so the patterns are different.
Like a roll call at the start of class. Everyone present says “here” — students through the door (devices), staff through the office intercom (broker SYS topics). After a beat of silence on a name, the teacher knows who’s missing.

When to Reach for This

Whenever uptime matters: critical infrastructure, paid telemetry, anything you’d want to dispatch a technician for. Skip it for opportunistic devices where occasional silence is normal (mobile phones, ad-hoc sensors). For routes, this is almost always worth doing — a silent integration is a silent data loss.

Approach 1: Heartbeats for Devices

Each device publishes a timestamp to its own status topic at a regular interval. Then a scheduled Action checks for any device whose last timestamp is too old. If your devices already send data regularly, you can stamp the heartbeat directly from the Action that processes their readings:
DEFINE ACTION RecordHeartbeat
ON TOPIC "sensors/+/+" DO
    SET "device_id" WITH TOPIC POSITION 2
    KEEP TOPIC "state/" + {device_id} + "/last_seen" WITH TIMESTAMP "UTC"
Every time any sensor sends any data, this Action stamps a last_seen timestamp for that device. The wildcard means it covers every sensor automatically, and KEEP TOPIC retains the value internally without broadcasting it. A scheduled Action then wakes up periodically, compares each timestamp to “now,” and publishes an alert for anything stale. Time arithmetic is awkward in LoT, so this is a natural place to call a small Python helper (see Running Python from a LoT Action):
DEFINE ACTION CheckDeviceLiveness
ON EVERY 60 SECONDS DO
    CALL PYTHON "Liveness.check_all"
        WITH (300)
        RETURN AS {result}

    SET "offline_count" WITH (GET JSON "count" IN {result} AS INT)

    IF {offline_count} > 0 THEN
        PUBLISH MODEL Alarm TO "alerts/devices/offline" WITH
            type = "DEVICES_OFFLINE"
            severity = "WARNING"
            value = {offline_count}
            threshold = 0
            timestamp = TIMESTAMP "UTC"
The Python function reads every state/+/last_seen topic, compares to the current time, and returns the count and list of devices whose last_seen is older than 300 seconds.

Approach 2: SYS topics for Routes

The broker exposes a live view of itself through SYS topics — a built-in dashboard in MQTT form. For monitoring route health, read the JSON array published on the topic below. It lists every client currently connected to the broker, with timestamps and connection details. Routes that maintain MQTT connections (bridges, REST endpoints in client mode, anything with a connected client identity) appear in this list. When one drops, it disappears from the array.
$SYS/Coreflux/Comms/Clients
The pattern: keep an “expected routes” list in a config topic, subscribe to that clients topic, and have an Action that flags any expected route that isn’t in the live connection list.
DEFINE ACTION CheckRoutesOnline
ON EVERY 60 SECONDS DO
    SET "expected" WITH (GET TOPIC "config/expected_routes")
    SET "clients" WITH (GET TOPIC "$SYS/Coreflux/Comms/Clients")

    CALL PYTHON "RouteMonitor.find_missing"
        WITH ({expected}, {clients})
        RETURN AS {result}

    SET "missing_count" WITH (GET JSON "count" IN {result} AS INT)

    IF {missing_count} > 0 THEN
        SET "missing_list" WITH (GET JSON "list" IN {result} AS STRING)
        PUBLISH MODEL Alarm TO "alerts/routes/offline" WITH
            type = "ROUTES_OFFLINE"
            severity = "CRITICAL"
            value = {missing_count}
            threshold = 0
            timestamp = TIMESTAMP "UTC"
        PUBLISH TOPIC "system/offline_routes" WITH {missing_list}
The Python helper does the actual list comparison — straightforward in Python, fiddly in LoT:
# /scripts/RouteMonitor.py
import json

def find_missing(expected_csv, clients_json):
    expected = [name.strip() for name in expected_csv.split(",") if name.strip()]
    try:
        clients = json.loads(clients_json)
        connected_ids = {c.get("Id", "") for c in clients}
    except (json.JSONDecodeError, TypeError):
        connected_ids = set()

    missing = [name for name in expected if name not in connected_ids]
    return {"count": len(missing), "list": ",".join(missing)}
What’s happening:
  1. The expected route list lives in a config topic — for example, you’d publish "DatabaseRoute,CloudBridge,WeatherAPI" to config/expected_routes. Operators can update it without redeploying anything.
  2. The Action reads the live JSON from GET TOPIC on the path above and hands both lists to Python.
  3. Python parses the connections, computes the set difference, and returns the count plus a comma-separated list of missing routes.
  4. If anything is missing, the Action publishes a critical alarm and a list of which routes are down.

Combining the Two Approaches

A complete monitoring layer runs both Actions side by side: heartbeats catch devices that have stopped reporting even though their network looks fine, and the SYS clients-topic check catches routes that have lost their integration to the outside world. Both publish into the same alerts/ branch, so a single subscriber — a dashboard, a notification rule, an email route — gives you a unified “what’s not working right now” view. Pair either Action with the state tracking variant to avoid alert spam: one alert when something disappears, one when it comes back, silence in between.

Next Steps

Data Pipeline Routes

Email, MQTT bridge, and other event-driven pipeline routes.

SYS Topics

Built-in broker topics for monitoring clients and comms.
Last modified on May 20, 2026