> ## Documentation Index
> Fetch the complete documentation index at: https://docs.coreflux.org/llms.txt
> Use this file to discover all available pages before exploring further.

# OpenSearch Route

> Index MQTT data in OpenSearch for real-time search, analytics, and visualization

## OpenSearch Overview

The `OPENSEARCH` route indexes MQTT messages in OpenSearch for full-text search, real-time analytics, and dashboard visualization. It uses the CLEAN query format for simplified document indexing.

<Note>
  OpenSearch is ideal for log aggregation, metrics visualization, and full-text search across IoT data. Connect it to OpenSearch Dashboards for powerful visualizations.
</Note>

## Basic Syntax

```lot theme={null}
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://opensearch.example.com:9200"
        WITH USERNAME "admin"
        WITH PASSWORD "secure_password"
        WITH USE_SSL "true"
    ADD EVENT IndexSensorReading
        WITH SOURCE_TOPIC "sensors/+/data"
        WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor: {sensor_id}, reading: {value.json}}}"
```

***

## Connection Configuration

### OPENSEARCH\_CONFIG Parameters

<ParamField path="BASE_URL" type="string" required>
  OpenSearch endpoint URL including port (e.g., `https://opensearch.example.com:9200`).
</ParamField>

<ParamField path="USERNAME" type="string" required>
  OpenSearch username.
</ParamField>

<ParamField path="PASSWORD" type="string" required>
  OpenSearch password.
</ParamField>

<AccordionGroup>
  <Accordion title="SSL/Security">
    <ParamField path="USE_SSL" type="boolean">
      Enable HTTPS. Default: false.
    </ParamField>

    <ParamField path="IGNORE_CERT_ERRORS" type="boolean">
      Ignore certificate validation errors. Default: false.
    </ParamField>
  </Accordion>
</AccordionGroup>

<Warning>
  **Never hardcode credentials in production.** Use environment variables and encrypted secrets to keep sensitive values out of your route definitions:

  ```lot theme={null}
  WITH BASE_URL GET ENV "OPENSEARCH_URL"
  WITH USERNAME GET ENV "OPENSEARCH_USER"
  WITH PASSWORD GET SECRET "OPENSEARCH_PASSWORD"
  ```

  See [Environment Variables & Secrets](/mqtt-broker/secrets-and-env) for setup and usage.
</Warning>

***

## CLEAN Query Format

OpenSearch routes use the CLEAN format for document indexing:

```lot theme={null}
WITH QUERY "CLEAN:{index: <index_name>, body: {<field>: <value>, ...}}"
```

### CLEAN Placeholders

| Placeholder          | Description                                     |
| -------------------- | ----------------------------------------------- |
| `{value.json}`       | Full JSON payload as embedded object            |
| `{value.json.field}` | Specific field from JSON payload                |
| `{timestamp}`        | Message timestamp (use `@timestamp` for Kibana) |
| `{source_topic}`     | Original MQTT topic                             |
| `{field}`            | Field extracted from topic path                 |

***

## Writing Data

<Tabs>
  <Tab title="Basic Indexing">
    Index sensor data for search:

    ```lot theme={null}
    DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
        ADD OPENSEARCH_CONFIG
            WITH BASE_URL "https://opensearch.example.com:9200"
            WITH USERNAME "admin"
            WITH PASSWORD "secure_password"
            WITH USE_SSL "true"
        ADD EVENT IndexReading
            WITH SOURCE_TOPIC "sensors/+/data"
            WITH DESTINATION_TOPIC "search/status"
            WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor_id: {sensor_id}, data: {value.json}}}"
    ```
  </Tab>

  <Tab title="Log Aggregation">
    Aggregate application logs:

    ```lot theme={null}
    DEFINE ROUTE LogIndex WITH TYPE OPENSEARCH
        ADD OPENSEARCH_CONFIG
            WITH BASE_URL "https://opensearch.example.com:9200"
            WITH USERNAME "admin"
            WITH PASSWORD "admin_password"
            WITH USE_SSL "true"
        ADD EVENT IndexLogs
            WITH SOURCE_TOPIC "logs/+/+"
            WITH QUERY "CLEAN:{index: application-logs, body: {@timestamp: {timestamp}, source: {source_topic}, level: {value.json.level}, message: {value.json.message}}}"
    ```
  </Tab>

  <Tab title="Time-Series Index">
    Use date-based index names for time-series:

    ```lot theme={null}
    DEFINE ROUTE TimeSeriesIndex WITH TYPE OPENSEARCH
        ADD OPENSEARCH_CONFIG
            WITH BASE_URL "https://opensearch.example.com:9200"
            WITH USERNAME "admin"
            WITH PASSWORD "secure_password"
            WITH USE_SSL "true"
        ADD EVENT IndexMetrics
            WITH SOURCE_TOPIC "metrics/#"
            WITH QUERY "CLEAN:{index: metrics, body: {@timestamp: {timestamp}, topic: {source_topic}, metrics: {value.json}}}"
    ```

    Configure Index Lifecycle Management (ILM) in OpenSearch for automatic rollover.
  </Tab>

  <Tab title="Self-Signed Certs">
    Connect with certificate validation disabled:

    ```lot theme={null}
    DEFINE ROUTE DevIndex WITH TYPE OPENSEARCH
        ADD OPENSEARCH_CONFIG
            WITH BASE_URL "https://localhost:9200"
            WITH USERNAME "admin"
            WITH PASSWORD "admin"
            WITH USE_SSL "true"
            WITH IGNORE_CERT_ERRORS "true"
        ADD EVENT IndexData
            WITH SOURCE_TOPIC "data/#"
            WITH QUERY "CLEAN:{index: dev-data, body: {@timestamp: {timestamp}, topic: {source_topic}, payload: {value.json}}}"
    ```

    <Warning>
      Only use IGNORE\_CERT\_ERRORS in development environments.
    </Warning>
  </Tab>
</Tabs>

<Info>
  **Alternative: STORE IN with Models** — Instead of writing EVENT queries, you can bind a [model](/lot-language/models/overview) directly to this route. Every `PUBLISH MODEL` call automatically inserts a row — no query needed:

  ```lot theme={null}
  DEFINE MODEL SensorReading
      ADD STRING "sensor_id"
      ADD DOUBLE "value"
      STORE IN "SensorIndex"
          WITH TABLE "sensor-data"
  ```

  See [Data Storage Overview](./overview#with-models-store-in) for the full STORE IN workflow.
</Info>

***

## Reading Data

EVENTs also support search queries to retrieve data from the index. Publish a message to the event's `SOURCE_TOPIC`, and the query result is published to `DESTINATION_TOPIC`. OpenSearch read queries use the CLEAN format with `query`, `sort`, and `size` instead of SQL:

```lot theme={null}
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH
    ADD OPENSEARCH_CONFIG
        WITH BASE_URL "https://opensearch.example.com:9200"
        WITH USERNAME "admin"
        WITH PASSWORD "secure_password"
        WITH USE_SSL "true"
    ADD EVENT GetLatestReading
        WITH SOURCE_TOPIC "search/query/latest"
        WITH DESTINATION_TOPIC "search/result/latest"
        WITH QUERY "CLEAN:{index: sensor-data, query: {\"match\": {\"sensor_id\": \"{payload}\"}}, sort: [{\"@timestamp\": {\"order\": \"desc\"}}], size: 1}"
```

To trigger this query, publish the sensor ID to the source topic:

```
Topic:   search/query/latest
Payload: temp001
```

The matching document is published to `search/result/latest`, where your actions or external clients can consume it.

### CLEAN Read Operators

| SQL Equivalent    | OpenSearch CLEAN Syntax               |
| ----------------- | ------------------------------------- |
| `SELECT *`        | `query: {"match_all": {}}`            |
| `WHERE a = 1`     | `query: {"match": {"a": 1}}`          |
| `WHERE a > 10`    | `query: {"range": {"a": {"gt": 10}}}` |
| `ORDER BY a DESC` | `sort: [{"a": {"order": "desc"}}]`    |
| `LIMIT 10`        | `size: 10`                            |
| `OFFSET 5`        | `from: 5`                             |

***

## Index Mapping Example

Create an index with explicit mappings for better search and aggregation:

```json theme={null}
PUT /sensor-data
{
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "sensor_id": { "type": "keyword" },
      "data": {
        "properties": {
          "temperature": { "type": "float" },
          "humidity": { "type": "float" }
        }
      }
    }
  }
}
```

***

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connection Failed">
    * Verify BASE\_URL is correct including port
    * Check OpenSearch is running and accessible
    * Verify firewall allows connections on port 9200
  </Accordion>

  <Accordion title="Authentication Failed">
    * Verify USERNAME and PASSWORD are correct
    * Check user has permissions to index documents
  </Accordion>

  <Accordion title="SSL/Certificate Errors">
    * Ensure USE\_SSL matches the endpoint (http vs https)
    * For self-signed certs, set IGNORE\_CERT\_ERRORS "true"
    * Verify certificate chain is complete
  </Accordion>

  <Accordion title="Index Errors">
    * Verify index name is valid (lowercase, no special chars)
    * Check field mappings don't conflict
    * Review OpenSearch logs for detailed errors
  </Accordion>
</AccordionGroup>

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Data Storage Routes Overview" icon="database" href="./overview">
    Compare all storage options.
  </Card>

  <Card title="CrateDB Route" icon="clock" href="./cratedb">
    Configure CrateDB for time-series data.
  </Card>
</CardGroup>
