OpenSearch Route
Feature | Since Version | Notes |
---|---|---|
OpenSearch Route | >v1.6.3 | Indexes MQTT data into OpenSearch. |
1. Overview
The OpenSearch Route implementation in the Coreflux MQTT broker allows for seamless indexing of MQTT message payloads into an OpenSearch cluster.
For practical examples of LOT Routes, check out the Routes examples in the LOT Samples Repository. This route is designed for high-throughput data logging, searching, and analysis use cases, making it ideal for storing time-series data, application logs, and other JSON-formatted information.
2. Key Features
- Direct Indexing: Stores MQTT message payloads directly as documents in an OpenSearch index.
- Secure Communication: Supports TLS/SSL to secure connections to the OpenSearch cluster.
- Authentication: Supports basic authentication with a username and password.
- Data Caching: Temporarily caches up to 100 messages during connection interruptions to prevent data loss.
- Flexible Endpoint Configuration: Easily configure the base URL for your OpenSearch domain.
- Development-Friendly: Includes an option to bypass SSL certificate validation for testing environments.
3. Route Configuration
A route is configured using the addRoute
command. The configuration is provided within an OPENSEARCH_CONFIG
block.
Example: OpenSearch Route Definition
DEFINE ROUTE OpenSearchLogger WITH TYPE OPENSEARCH
ADD OPENSEARCH_CONFIG
WITH BASE_URL "https://my-opensearch-cluster:9200"
WITH USERNAME "myuser"
WITH PASSWORD "mypassword"
WITH USE_SSL TRUE
WITH IGNORE_CERT_ERRORS FALSE
Adding the Route via MQTT Command
To deploy this route to the broker, use the -addRoute
command by publishing to the $SYS/Coreflux/Command
topic:
-addRoute DEFINE ROUTE OpenSearchLogger WITH TYPE OPENSEARCH
ADD OPENSEARCH_CONFIG
WITH BASE_URL "https://my-opensearch-cluster:9200"
WITH USERNAME "myuser"
WITH PASSWORD "mypassword"
WITH USE_SSL TRUE
WITH IGNORE_CERT_ERRORS FALSE
For more information about broker commands, see the MQTT Broker Commands documentation.
Deploying Routes with LOT Notebooks
Routes can also be created and deployed interactively using LOT Notebooks in Visual Studio Code. This provides a convenient way to develop, test, and document your routes.
To get started with LOT Notebooks:
-
Install the LOT Notebooks extension in VS Code
-
Create a new
.lotnb
file -
Add code cells with your route definitions
-
Execute the cells to deploy directly to your broker
For detailed instructions, see the LOT Notebooks Getting Started Guide.
4. Data Handling and Reliability
Connection Testing
On startup and restart, the broker validates the connection to the OpenSearch cluster. This ensures that the route is ready to receive data before it is fully activated.
Data Caching
If the connection to the OpenSearch cluster is lost, the route will automatically cache incoming data in memory. - It stores up to 100 messages. - Once the connection is restored, the cached data is automatically flushed to OpenSearch. - If the cache exceeds its limit, the oldest messages are discarded. This feature is intended to handle brief network interruptions, not for long-term offline buffering.
5. Using Models for Structured Indexing
For robust data pipelines, it is highly recommended to use LOT Models with your OpenSearch route. A Model serves as a schema and transformation layer, converting raw MQTT data into a structured JSON document that is ideal for indexing. The STORE IN
command within the model then directs this structured document to the OpenSearch route.
Example: Storing Structured Log Data
1. The Model Definition
This model creates a structured log entry from raw sensor data, adding a timestamp for proper time-series analysis.
DEFINE MODEL MyOpenSearchModel COLLAPSED WITH TOPIC "MyOpenSearchModel"
ADD "event_time" WITH TIMESTAMP "ISO"
ADD "sensor_id" WITH TOPIC "devices/+/id"
ADD "reading" WITH TOPIC "devices/+/payload" AS TRIGGER
STORE IN "OpenSearchLogger"
WITH TABLE "device-logs"
... AS TRIGGER
: The model executes when a message arrives on devices/+/payload
.
- STORE IN "OpenSearchLogger"
: This sends the resulting JSON object to our defined route.
- WITH TABLE "device-logs"
: This specifies the OpenSearch index name where the document will be stored. The index name will be converted to lowercase.
2. The Resulting OpenSearch Document
When triggered, the model sends a structured JSON document to the route, which indexes it in OpenSearch. Each document is given a unique ID.
{
"_index": "device-logs",
"_id": "0a1b2c3d-4e5f-6a7b-8c9d-0e1f2a3b4c5d",
"_source": {
"event_time": "2023-10-27T10:00:00Z",
"sensor_id": "temp-sensor-04",
"reading": { "temperature": 35.5, "humidity": 60 }
}
}
6. Dashboard Integration
To visualize your data in a dashboard (like OpenSearch Dashboards), ensure that your models are creating a consistent data structure and that you are targeting the correct index name (e.g., device-logs
). A well-defined index pattern in your dashboarding tool is key to effective visualization and analysis.