Skip to content

OpenSearch Route

Feature Since Version Notes
OpenSearch Route >v1.6.3 Indexes MQTT data into OpenSearch.

1. Overview

The OpenSearch Route implementation in the Coreflux MQTT broker allows for seamless indexing of MQTT message payloads into an OpenSearch cluster.

For practical examples of LOT Routes, check out the Routes examples in the LOT Samples Repository. This route is designed for high-throughput data logging, searching, and analysis use cases, making it ideal for storing time-series data, application logs, and other JSON-formatted information.

2. Key Features

  • Direct Indexing: Stores MQTT message payloads directly as documents in an OpenSearch index.
  • Secure Communication: Supports TLS/SSL to secure connections to the OpenSearch cluster.
  • Authentication: Supports basic authentication with a username and password.
  • Data Caching: Temporarily caches up to 100 messages during connection interruptions to prevent data loss.
  • Flexible Endpoint Configuration: Easily configure the base URL for your OpenSearch domain.
  • Development-Friendly: Includes an option to bypass SSL certificate validation for testing environments.

3. Route Configuration

A route is configured using the addRoute command. The configuration is provided within an OPENSEARCH_CONFIG block.

Example: OpenSearch Route Definition

DEFINE ROUTE OpenSearchLogger WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://my-opensearch-cluster:9200"
        WITH USERNAME "myuser"
        WITH PASSWORD "mypassword"
        WITH USE_SSL TRUE
        WITH IGNORE_CERT_ERRORS FALSE

Adding the Route via MQTT Command

To deploy this route to the broker, use the -addRoute command by publishing to the $SYS/Coreflux/Command topic:

-addRoute DEFINE ROUTE OpenSearchLogger WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://my-opensearch-cluster:9200"
        WITH USERNAME "myuser"
        WITH PASSWORD "mypassword"
        WITH USE_SSL TRUE
        WITH IGNORE_CERT_ERRORS FALSE

For more information about broker commands, see the MQTT Broker Commands documentation.

Deploying Routes with LOT Notebooks

Routes can also be created and deployed interactively using LOT Notebooks in Visual Studio Code. This provides a convenient way to develop, test, and document your routes.

To get started with LOT Notebooks:

  1. Install the LOT Notebooks extension in VS Code

  2. Create a new .lotnb file

  3. Add code cells with your route definitions

  4. Execute the cells to deploy directly to your broker

For detailed instructions, see the LOT Notebooks Getting Started Guide.

4. Data Handling and Reliability

Connection Testing

On startup and restart, the broker validates the connection to the OpenSearch cluster. This ensures that the route is ready to receive data before it is fully activated.

Data Caching

If the connection to the OpenSearch cluster is lost, the route will automatically cache incoming data in memory. - It stores up to 100 messages. - Once the connection is restored, the cached data is automatically flushed to OpenSearch. - If the cache exceeds its limit, the oldest messages are discarded. This feature is intended to handle brief network interruptions, not for long-term offline buffering.

5. Using Models for Structured Indexing

For robust data pipelines, it is highly recommended to use LOT Models with your OpenSearch route. A Model serves as a schema and transformation layer, converting raw MQTT data into a structured JSON document that is ideal for indexing. The STORE IN command within the model then directs this structured document to the OpenSearch route.

Example: Storing Structured Log Data

1. The Model Definition

This model creates a structured log entry from raw sensor data, adding a timestamp for proper time-series analysis.

DEFINE MODEL MyOpenSearchModel COLLAPSED WITH TOPIC "MyOpenSearchModel"
    ADD "event_time" WITH TIMESTAMP "ISO"
    ADD "sensor_id" WITH TOPIC "devices/+/id"
    ADD "reading" WITH TOPIC "devices/+/payload" AS TRIGGER
    STORE IN "OpenSearchLogger"
        WITH TABLE "device-logs"
How it works: - ... AS TRIGGER: The model executes when a message arrives on devices/+/payload. - STORE IN "OpenSearchLogger": This sends the resulting JSON object to our defined route. - WITH TABLE "device-logs": This specifies the OpenSearch index name where the document will be stored. The index name will be converted to lowercase.

2. The Resulting OpenSearch Document

When triggered, the model sends a structured JSON document to the route, which indexes it in OpenSearch. Each document is given a unique ID.

{
  "_index": "device-logs",
  "_id": "0a1b2c3d-4e5f-6a7b-8c9d-0e1f2a3b4c5d",
  "_source": {
    "event_time": "2023-10-27T10:00:00Z",
    "sensor_id": "temp-sensor-04",
    "reading": { "temperature": 35.5, "humidity": 60 }
  }
}

6. Dashboard Integration

To visualize your data in a dashboard (like OpenSearch Dashboards), ensure that your models are creating a consistent data structure and that you are targeting the correct index name (e.g., device-logs). A well-defined index pattern in your dashboarding tool is key to effective visualization and analysis.