Skip to main content

Data Pipeline Overview

Data pipeline routes enable data flow between your Coreflux broker and external systems. They connect your broker to other MQTT brokers and email services.
Like a postal forwarding service. Mail (messages) sent to your old address (local broker) automatically gets forwarded to your new address (remote broker). You set up the forwarding rules once, and it just works.
The pipeline route types are:
  • MQTT Bridge - Broker-to-broker communication for edge-to-cloud sync and multi-site connectivity
  • Email - Send notifications and alerts triggered by MQTT messages

MQTT Bridge

The MQTT_BRIDGE route enables seamless data transfer between two MQTT brokers. Use it for edge-to-cloud synchronization, multi-site connectivity, or backup systems.

Basic Syntax

DEFINE ROUTE CloudBridge WITH TYPE MQTT_BRIDGE
    ADD SOURCE_CONFIG
        WITH BROKER SELF
    ADD DESTINATION_CONFIG
        WITH BROKER_ADDRESS "cloud.example.com"
        WITH BROKER_PORT '8883'
        WITH CLIENT_ID "EdgeDevice001"
        WITH USE_TLS "true"
    ADD MAPPING sensorSync
        WITH SOURCE_TOPIC "sensors/+/data"
        WITH DESTINATION_TOPIC "edge/sensors/+/data"
        WITH DIRECTION "out"

Connection Configuration

BROKER_ADDRESS
string
required
IP address or hostname of the broker. Use SELF for the local broker.
BROKER_PORT
integer
required
MQTT port (typically 1883 for unencrypted, 8883 for TLS).
CLIENT_ID
string
required
Unique client identifier for the bridge connection.
USERNAME
string
Authentication username.
PASSWORD
string
Authentication password.
USE_TLS
boolean
Enable TLS encryption. Default: false.
ALLOW_UNTRUSTED_CERTS
boolean
Allow untrusted certificates. Default: false.
SERVER_CA_CERT_PATH
string
Path to CA certificate file.
CLIENT_CERT_PATH
string
Path to client certificate for mTLS.
CLIENT_CERT_PASS
string
Client certificate password.
RECONNECTION_RETRIES
integer
Number of reconnection attempts. Default: 5.

Mapping Configuration

SOURCE_TOPIC
string
required
Topic pattern to subscribe to on the source broker. Supports wildcards.
DESTINATION_TOPIC
string
required
Topic to publish to on the destination broker.
DIRECTION
string
required
Data flow direction: out (source→dest), in (dest→source), or both.

Bridge Examples

Send local sensor data to a cloud broker:
DEFINE ROUTE EdgeToCloud WITH TYPE MQTT_BRIDGE
    ADD SOURCE_CONFIG
        WITH BROKER SELF
    ADD DESTINATION_CONFIG
        WITH BROKER_ADDRESS "iot.cloud-provider.com"
        WITH BROKER_PORT '8883'
        WITH CLIENT_ID "EdgeDevice001"
        WITH USERNAME "edge_user"
        WITH PASSWORD "secure_password"
        WITH USE_TLS "true"
    ADD MAPPING sensorData
        WITH SOURCE_TOPIC "sensors/+/data"
        WITH DESTINATION_TOPIC "edge-001/sensors/+/data"
        WITH DIRECTION "out"

Email Route

The EMAIL route sends templated emails based on MQTT messages. Ideal for alerts, notifications, and reports.

Basic Syntax

DEFINE ROUTE AlertEmail WITH TYPE EMAIL
    ADD SMTP_CONFIG
        WITH HOST "smtp.gmail.com"
        WITH PORT '587'
        WITH USERNAME "[email protected]"
        WITH PASSWORD "app-password-here"
        WITH USE_TLS "true"
    ADD EVENT criticalAlert
        WITH SOURCE_TOPIC "alerts/critical/+"
        WITH SUBJECT "CRITICAL: {value.json.alert_type}"
        WITH RECIPIENT "[email protected]"

SMTP Configuration

HOST
string
required
SMTP server address (e.g., smtp.gmail.com, smtp.office365.com).
PORT
integer
required
SMTP port (typically 587 for TLS, 465 for SSL).
USE_TLS
boolean
Enable TLS encryption. Recommended: true.
USERNAME
string
required
Email account to send from.
PASSWORD
string
required
Email password or app-specific password.

Event Configuration

SOURCE_TOPIC
string
MQTT topic to trigger email sending.
DESTINATION_TOPIC
string
MQTT topic to publish send status.
TEMPLATE_PATH
string
Path to HTML email template file.
SUBJECT
string
Email subject line (supports placeholders).
RECIPIENT
string
Email recipient address (supports placeholders).

Template Placeholders

PlaceholderDescription
{value}Full JSON payload as string
{value.json.field}Single field from JSON
{value.json.nested.field}Nested field access

Email Examples

Send immediate alerts for critical events:
DEFINE ROUTE CriticalAlerts WITH TYPE EMAIL
    ADD SMTP_CONFIG
        WITH HOST "smtp.gmail.com"
        WITH PORT '587'
        WITH USERNAME "[email protected]"
        WITH PASSWORD "app-password-here"
        WITH USE_TLS "true"
    ADD EVENT criticalNotify
        WITH SOURCE_TOPIC "alerts/critical/+"
        WITH TEMPLATE_PATH "/templates/critical_alert.html"
        WITH SUBJECT "CRITICAL: {value.json.alert_type}"
        WITH RECIPIENT "[email protected]"
For Gmail, use an App Password instead of your regular password. Enable 2FA on your Google account first.

Troubleshooting

  • Verify IP address and port are correct
  • Check firewall allows outbound connections on the configured port
  • Ensure remote broker is running and accessible
  • Try increasing RECONNECTION_RETRIES
  • Verify certificate paths are correct
  • Ensure certificates are valid and not expired
  • Check certificate chain is complete
  • For testing, use ALLOW_UNTRUSTED_CERTS "true" (not recommended for production)
  • Verify SOURCE_TOPIC and DESTINATION_TOPIC are correct
  • Check DIRECTION setting matches your expected data flow
  • Ensure CLIENT_ID is unique across all connections
  • Check broker logs for subscription errors
  • Verify SMTP credentials are correct
  • For Gmail, ensure you’re using an App Password
  • Check firewall allows outbound SMTP connections
  • Verify recipient email address is valid
  • Double-check USERNAME and PASSWORD
  • Ensure 2FA is enabled if using app passwords
  • Some providers require enabling “less secure apps” access
  • Check for account lockouts due to failed attempts

Best Practices

Always enable TLS when connecting to external brokers:
WITH USE_TLS "true"
Each bridge connection needs a unique CLIENT_ID to avoid conflicts:
WITH CLIENT_ID "EdgeDevice001-SensorBridge"
Use WITH BROKER SELF during development:
ADD SOURCE_CONFIG
    WITH BROKER SELF
Store passwords securely and use app-specific passwords where available.
Configure appropriate retry settings:
WITH RECONNECTION_RETRIES '10'

Next Steps