The OPENSEARCH route indexes MQTT messages in OpenSearch for full-text search, real-time analytics, and dashboard visualization. It uses the CLEAN query format for simplified document indexing.
OpenSearch is ideal for log aggregation, metrics visualization, and full-text search across IoT data. Connect it to OpenSearch Dashboards for powerful visualizations.
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.example.com:9200" WITH USERNAME "admin" WITH PASSWORD "secure_password" WITH USE_SSL "true" ADD EVENT IndexSensorReading WITH SOURCE_TOPIC "sensors/+/data" WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor: {sensor_id}, reading: {value.json}}}"
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.example.com:9200" WITH USERNAME "admin" WITH PASSWORD "secure_password" WITH USE_SSL "true" ADD EVENT IndexReading WITH SOURCE_TOPIC "sensors/+/data" WITH DESTINATION_TOPIC "search/status" WITH QUERY "CLEAN:{index: sensor-data, body: {@timestamp: {timestamp}, sensor_id: {sensor_id}, data: {value.json}}}"
Aggregate application logs:
DEFINE ROUTE LogIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.example.com:9200" WITH USERNAME "admin" WITH PASSWORD "admin_password" WITH USE_SSL "true" ADD EVENT IndexLogs WITH SOURCE_TOPIC "logs/+/+" WITH QUERY "CLEAN:{index: application-logs, body: {@timestamp: {timestamp}, source: {source_topic}, level: {value.json.level}, message: {value.json.message}}}"
Use date-based index names for time-series:
DEFINE ROUTE TimeSeriesIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.example.com:9200" WITH USERNAME "admin" WITH PASSWORD "secure_password" WITH USE_SSL "true" ADD EVENT IndexMetrics WITH SOURCE_TOPIC "metrics/#" WITH QUERY "CLEAN:{index: metrics, body: {@timestamp: {timestamp}, topic: {source_topic}, metrics: {value.json}}}"
Configure Index Lifecycle Management (ILM) in OpenSearch for automatic rollover.
Connect with certificate validation disabled:
DEFINE ROUTE DevIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://localhost:9200" WITH USERNAME "admin" WITH PASSWORD "admin" WITH USE_SSL "true" WITH IGNORE_CERT_ERRORS "true" ADD EVENT IndexData WITH SOURCE_TOPIC "data/#" WITH QUERY "CLEAN:{index: dev-data, body: {@timestamp: {timestamp}, topic: {source_topic}, payload: {value.json}}}"
Only use IGNORE_CERT_ERRORS in development environments.
Alternative: STORE IN with Models — Instead of writing EVENT queries, you can bind a model directly to this route. Every PUBLISH MODEL call automatically inserts a row — no query needed:
DEFINE MODEL SensorReading ADD STRING "sensor_id" ADD DOUBLE "value" STORE IN "SensorIndex" WITH TABLE "sensor-data"
EVENTs also support search queries to retrieve data from the index. Publish a message to the event’s SOURCE_TOPIC, and the query result is published to DESTINATION_TOPIC. OpenSearch read queries use the CLEAN format with query, sort, and size instead of SQL:
DEFINE ROUTE SensorIndex WITH TYPE OPENSEARCH ADD OPENSEARCH_CONFIG WITH BASE_URL "https://opensearch.example.com:9200" WITH USERNAME "admin" WITH PASSWORD "secure_password" WITH USE_SSL "true" ADD EVENT GetLatestReading WITH SOURCE_TOPIC "search/query/latest" WITH DESTINATION_TOPIC "search/result/latest" WITH QUERY "CLEAN:{index: sensor-data, query: {\"match\": {\"sensor_id\": \"{payload}\"}}, sort: [{\"@timestamp\": {\"order\": \"desc\"}}], size: 1}"
To trigger this query, publish the sensor ID to the source topic:
Topic: search/query/latestPayload: temp001
The matching document is published to search/result/latest, where your actions or external clients can consume it.