The SNOWFLAKE route stores MQTT messages in Snowflake, a cloud-native data warehouse. It supports high-volume data ingestion with automatic scaling and powerful SQL analytics.
Snowflake is ideal for large-scale analytics, data lakes, and enterprise reporting. Use it when you need to analyze IoT data alongside other business data in the cloud.
DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=myuser;password=mypassword" WITH WAREHOUSE "COMPUTE_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "SENSORS" ADD EVENT StoreSensorReading WITH SOURCE_TOPIC "sensors/+/reading" WITH QUERY "INSERT INTO SENSOR_READINGS (TIMESTAMP, SENSOR_ID, VALUE) VALUES (CURRENT_TIMESTAMP(), '{sensor_id}', PARSE_JSON('{value.json}'))"
DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "SENSORS" WITH ROLE "IOT_WRITER" ADD EVENT StoreReading WITH SOURCE_TOPIC "sensors/+/reading" WITH DESTINATION_TOPIC "snowflake/status" WITH QUERY "INSERT INTO SENSOR_READINGS (TIMESTAMP, SENSOR_ID, VALUE) VALUES (CURRENT_TIMESTAMP(), '{sensor_id}', PARSE_JSON('{value.json}'))"
Optimized for high throughput:
DEFINE ROUTE HighVolumeWarehouse WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "EVENTS" WITH QUERY_TIMEOUT '60' WITH POOL_SIZE '20' ADD EVENT StoreEvents WITH SOURCE_TOPIC "events/#" WITH QUERY "INSERT INTO EVENT_LOG (TIMESTAMP, TOPIC, PAYLOAD) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))"
Route to different tables:
DEFINE ROUTE MultiTableWarehouse WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "PUBLIC" ADD EVENT StoreSensors WITH SOURCE_TOPIC "sensors/#" WITH QUERY "INSERT INTO SENSORS (TS, TOPIC, DATA) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))" ADD EVENT StoreAlerts WITH SOURCE_TOPIC "alerts/#" WITH QUERY "INSERT INTO ALERTS (TS, TOPIC, DATA) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))"
Snowflake uses MERGE INTO to insert or update depending on whether a matching row exists:
DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "DEVICES" ADD EVENT RegisterDevice WITH SOURCE_TOPIC "devices/+/registered" WITH QUERY "MERGE INTO DEVICES AS target USING (SELECT '{value.json.device_id}' AS device_id, '{value.json.name}' AS name, '{value.json.area}' AS area) AS source ON target.DEVICE_ID = source.device_id WHEN MATCHED THEN UPDATE SET NAME = source.name, AREA = source.area, UPDATED_AT = CURRENT_TIMESTAMP() WHEN NOT MATCHED THEN INSERT (DEVICE_ID, NAME, AREA, ACTIVE, UPDATED_AT) VALUES (source.device_id, source.name, source.area, TRUE, CURRENT_TIMESTAMP())"
Modify existing records when a status change arrives:
DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "DEVICES" ADD EVENT UpdateDeviceStatus WITH SOURCE_TOPIC "devices/+/status" WITH QUERY "UPDATE DEVICES SET CURRENT_STATUS = '{value.json.status}', LAST_SEEN = CURRENT_TIMESTAMP() WHERE DEVICE_ID = '{value.json.device_id}'"
Remove records on a decommission or cleanup event:
DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "DEVICES" ADD EVENT DecommissionDevice WITH SOURCE_TOPIC "devices/+/decommissioned" WITH QUERY "DELETE FROM DEVICES WHERE DEVICE_ID = '{value.json.device_id}' AND ACTIVE = FALSE"
Alternative: STORE IN with Models — Instead of writing EVENT queries, you can bind a model directly to this route. Every PUBLISH MODEL call automatically inserts a row — no query needed:
DEFINE MODEL SensorReading ADD STRING "sensor_id" ADD DOUBLE "value" STORE IN "SensorWarehouse" WITH TABLE "SENSOR_READINGS"
EVENTs also support SELECT queries to read data back from the database. Publish a message to the event’s SOURCE_TOPIC, and the query result is published to DESTINATION_TOPIC:
DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=myuser;password=mypassword" WITH WAREHOUSE "COMPUTE_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "SENSORS" ADD EVENT GetLatestReading WITH SOURCE_TOPIC "db/query/latest" WITH DESTINATION_TOPIC "db/result/latest" WITH QUERY "SELECT SENSOR_ID, VALUE FROM SENSOR_READINGS WHERE SENSOR_ID = '{payload}' ORDER BY TIMESTAMP DESC LIMIT 1"
To trigger this query, publish the sensor ID to the source topic:
Topic: db/query/latestPayload: temp001
The query result is published to db/result/latest, where your actions or external clients can consume it.
EVENTs can also run SELECT queries on a fixed schedule using WITH EVERY:
DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE ADD SNOWFLAKE_CONFIG WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password" WITH WAREHOUSE "IOT_WH" WITH DATABASE "IOT_DATA" WITH SCHEMA "DEVICES" ADD EVENT PollActiveDevices WITH EVERY 30 SECONDS WITH DESTINATION_TOPIC "dashboard/devices/active" WITH QUERY "SELECT DEVICE_ID, NAME, CURRENT_STATUS, LAST_SEEN FROM DEVICES WHERE ACTIVE = TRUE ORDER BY NAME"