Skip to main content

OpenSearch Overview

The OPENSEARCH route indexes MQTT messages in OpenSearch for full-text search, real-time analytics, and dashboard visualization. It uses the CLEAN query format for simplified document indexing.
OpenSearch is ideal for log aggregation, metrics visualization, and full-text search across IoT data. Connect it to OpenSearch Dashboards for powerful visualizations.

Basic Syntax

DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://opensearch.example.com:9200"
        WITH USERNAME "admin"
        WITH PASSWORD "secure_password"
        WITH USE_SSL "true"
    ADD EVENT IndexSensorReading
        WITH SOURCE_TOPIC "sensors/+/data"
        WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor: {sensor_id}, reading: {value.json}}}"

Connection Configuration

OPENSEARCH_CONFIG Parameters

BASE_URL
string
required
OpenSearch endpoint URL including port (e.g., https://opensearch.example.com:9200).
USERNAME
string
required
OpenSearch username.
PASSWORD
string
required
OpenSearch password.
USE_SSL
boolean
Enable HTTPS. Default: false.
IGNORE_CERT_ERRORS
boolean
Ignore certificate validation errors. Default: false.
Never hardcode credentials in production. Use environment variables and encrypted secrets to keep sensitive values out of your route definitions:
WITH BASE_URL GET ENV "OPENSEARCH_URL"
WITH USERNAME GET ENV "OPENSEARCH_USER"
WITH PASSWORD GET SECRET "OPENSEARCH_PASSWORD"
See Environment Variables & Secrets for setup and usage.

CLEAN Query Format

OpenSearch routes use the CLEAN format for document indexing:
WITH QUERY "CLEAN:{index: <index_name>, body: {<field>: <value>, ...}}"

CLEAN Placeholders

PlaceholderDescription
{value.json}Full JSON payload as embedded object
{value.json.field}Specific field from JSON payload
{timestamp}Message timestamp (use @timestamp for Kibana)
{source_topic}Original MQTT topic
{field}Field extracted from topic path

Writing Data

Index sensor data for search:
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://opensearch.example.com:9200"
        WITH USERNAME "admin"
        WITH PASSWORD "secure_password"
        WITH USE_SSL "true"
    ADD EVENT IndexReading
        WITH SOURCE_TOPIC "sensors/+/data"
        WITH DESTINATION_TOPIC "search/status"
        WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor_id: {sensor_id}, data: {value.json}}}"
Alternative: STORE IN with Models — Instead of writing EVENT queries, you can bind a model directly to this route. Every PUBLISH MODEL call automatically inserts a row — no query needed:
DEFINE MODEL SensorReading
    ADD STRING "sensor_id"
    ADD DOUBLE "value"
    STORE IN "SensorIndex"
        WITH TABLE "sensor-data"
See Data Storage Overview for the full STORE IN workflow.

Reading Data

EVENTs also support search queries to retrieve data from the index. Publish a message to the event’s SOURCE_TOPIC, and the query result is published to DESTINATION_TOPIC. OpenSearch read queries use the CLEAN format with query, sort, and size instead of SQL:
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://opensearch.example.com:9200"
        WITH USERNAME "admin"
        WITH PASSWORD "secure_password"
        WITH USE_SSL "true"
    ADD EVENT GetLatestReading
        WITH SOURCE_TOPIC "search/query/latest"
        WITH DESTINATION_TOPIC "search/result/latest"
        WITH QUERY "CLEAN:{index: sensor-data, query: {\"match\": {\"sensor_id\": \"{payload}\"}}, sort: [{\"@timestamp\": {\"order\": \"desc\"}}], size: 1}"
To trigger this query, publish the sensor ID to the source topic:
Topic:   search/query/latest
Payload: temp001
The matching document is published to search/result/latest, where your actions or external clients can consume it.

CLEAN Read Operators

SQL EquivalentOpenSearch CLEAN Syntax
SELECT *query: {"match_all": {}}
WHERE a = 1query: {"match": {"a": 1}}
WHERE a > 10query: {"range": {"a": {"gt": 10}}}
ORDER BY a DESCsort: [{"a": {"order": "desc"}}]
LIMIT 10size: 10
OFFSET 5from: 5

Index Mapping Example

Create an index with explicit mappings for better search and aggregation:
PUT /sensor-data
{
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "sensor_id": { "type": "keyword" },
      "data": {
        "properties": {
          "temperature": { "type": "float" },
          "humidity": { "type": "float" }
        }
      }
    }
  }
}

Troubleshooting

  • Verify BASE_URL is correct including port
  • Check OpenSearch is running and accessible
  • Verify firewall allows connections on port 9200
  • Verify USERNAME and PASSWORD are correct
  • Check user has permissions to index documents
  • Ensure USE_SSL matches the endpoint (http vs https)
  • For self-signed certs, set IGNORE_CERT_ERRORS “true”
  • Verify certificate chain is complete
  • Verify index name is valid (lowercase, no special chars)
  • Check field mappings don’t conflict
  • Review OpenSearch logs for detailed errors

Next Steps

Data Storage Routes Overview

Compare all storage options.

CrateDB Route

Configure CrateDB for time-series data.