> ## Documentation Index
> Fetch the complete documentation index at: https://docs.coreflux.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Snowflake Route

> Store MQTT data in Snowflake cloud data warehouse for large-scale analytics

## Snowflake Overview

The `SNOWFLAKE` route stores MQTT messages in Snowflake, a cloud-native data warehouse. It supports high-volume data ingestion with automatic scaling and powerful SQL analytics.

<Note>
  Snowflake is ideal for large-scale analytics, data lakes, and enterprise reporting. Use it when you need to analyze IoT data alongside other business data in the cloud.
</Note>

## Basic Syntax

```lot theme={null}
DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE
    ADD SNOWFLAKE_CONFIG
        WITH CONNECTION_STRING "account=myaccount;user=myuser;password=mypassword"
        WITH WAREHOUSE "COMPUTE_WH"
        WITH DATABASE "IOT_DATA"
        WITH SCHEMA "SENSORS"
    ADD EVENT StoreSensorReading
        WITH SOURCE_TOPIC "sensors/+/reading"
        WITH QUERY "INSERT INTO SENSOR_READINGS (TIMESTAMP, SENSOR_ID, VALUE) VALUES (CURRENT_TIMESTAMP(), '{sensor_id}', PARSE_JSON('{value.json}'))"
```

***

## Connection Configuration

### SNOWFLAKE\_CONFIG Parameters

<ParamField path="CONNECTION_STRING" type="string" required>
  Snowflake connection string with account, user, and password.
</ParamField>

<AccordionGroup>
  <Accordion title="Connection Options">
    <ParamField path="WAREHOUSE" type="string">
      Snowflake compute warehouse name.
    </ParamField>

    <ParamField path="DATABASE" type="string">
      Target database name.
    </ParamField>

    <ParamField path="SCHEMA" type="string">
      Target schema name. Default: PUBLIC.
    </ParamField>
  </Accordion>

  <Accordion title="Authentication">
    <ParamField path="ROLE" type="string">
      Snowflake user role.
    </ParamField>
  </Accordion>

  <Accordion title="Performance">
    <ParamField path="QUERY_TIMEOUT" type="integer">
      Query timeout in seconds. Default: 60.
    </ParamField>

    <ParamField path="POOL_SIZE" type="integer">
      Connection pool size. Default: 10.
    </ParamField>
  </Accordion>
</AccordionGroup>

<Warning>
  **Never hardcode credentials in production.** Use environment variables and encrypted secrets to keep sensitive values out of your route definitions:

  ```lot theme={null}
  WITH CONNECTION_STRING GET SECRET "SNOWFLAKE_CONNECTION_STRING"
  ```

  See [Environment Variables & Secrets](/mqtt-broker/secrets-and-env) for setup and usage.
</Warning>

***

## Writing Data

<Tabs>
  <Tab title="Basic Storage">
    Store sensor data in Snowflake:

    ```lot theme={null}
    DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "SENSORS"
            WITH ROLE "IOT_WRITER"
        ADD EVENT StoreReading
            WITH SOURCE_TOPIC "sensors/+/reading"
            WITH DESTINATION_TOPIC "snowflake/status"
            WITH QUERY "INSERT INTO SENSOR_READINGS (TIMESTAMP, SENSOR_ID, VALUE) VALUES (CURRENT_TIMESTAMP(), '{sensor_id}', PARSE_JSON('{value.json}'))"
    ```
  </Tab>

  <Tab title="With Connection Pool">
    Optimized for high throughput:

    ```lot theme={null}
    DEFINE ROUTE HighVolumeWarehouse WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "EVENTS"
            WITH QUERY_TIMEOUT '60'
            WITH POOL_SIZE '20'
        ADD EVENT StoreEvents
            WITH SOURCE_TOPIC "events/#"
            WITH QUERY "INSERT INTO EVENT_LOG (TIMESTAMP, TOPIC, PAYLOAD) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))"
    ```
  </Tab>

  <Tab title="Multiple Tables">
    Route to different tables:

    ```lot theme={null}
    DEFINE ROUTE MultiTableWarehouse WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "PUBLIC"
        
        ADD EVENT StoreSensors
            WITH SOURCE_TOPIC "sensors/#"
            WITH QUERY "INSERT INTO SENSORS (TS, TOPIC, DATA) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))"
        
        ADD EVENT StoreAlerts
            WITH SOURCE_TOPIC "alerts/#"
            WITH QUERY "INSERT INTO ALERTS (TS, TOPIC, DATA) VALUES (CURRENT_TIMESTAMP(), '{source_topic}', PARSE_JSON('{value.json}'))"
    ```
  </Tab>

  <Tab title="UPSERT">
    Snowflake uses `MERGE INTO` to insert or update depending on whether a matching row exists:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "DEVICES"
        ADD EVENT RegisterDevice
            WITH SOURCE_TOPIC "devices/+/registered"
            WITH QUERY "MERGE INTO DEVICES AS target USING (SELECT '{value.json.device_id}' AS device_id, '{value.json.name}' AS name, '{value.json.area}' AS area) AS source ON target.DEVICE_ID = source.device_id WHEN MATCHED THEN UPDATE SET NAME = source.name, AREA = source.area, UPDATED_AT = CURRENT_TIMESTAMP() WHEN NOT MATCHED THEN INSERT (DEVICE_ID, NAME, AREA, ACTIVE, UPDATED_AT) VALUES (source.device_id, source.name, source.area, TRUE, CURRENT_TIMESTAMP())"
    ```
  </Tab>

  <Tab title="UPDATE">
    Modify existing records when a status change arrives:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "DEVICES"
        ADD EVENT UpdateDeviceStatus
            WITH SOURCE_TOPIC "devices/+/status"
            WITH QUERY "UPDATE DEVICES SET CURRENT_STATUS = '{value.json.status}', LAST_SEEN = CURRENT_TIMESTAMP() WHERE DEVICE_ID = '{value.json.device_id}'"
    ```
  </Tab>

  <Tab title="DELETE">
    Remove records on a decommission or cleanup event:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE
        ADD SNOWFLAKE_CONFIG
            WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
            WITH WAREHOUSE "IOT_WH"
            WITH DATABASE "IOT_DATA"
            WITH SCHEMA "DEVICES"
        ADD EVENT DecommissionDevice
            WITH SOURCE_TOPIC "devices/+/decommissioned"
            WITH QUERY "DELETE FROM DEVICES WHERE DEVICE_ID = '{value.json.device_id}' AND ACTIVE = FALSE"
    ```
  </Tab>
</Tabs>

<Info>
  **Alternative: STORE IN with Models** — Instead of writing EVENT queries, you can bind a [model](/lot-language/models/overview) directly to this route. Every `PUBLISH MODEL` call automatically inserts a row — no query needed:

  ```lot theme={null}
  DEFINE MODEL SensorReading
      ADD STRING "sensor_id"
      ADD DOUBLE "value"
      STORE IN "SensorWarehouse"
          WITH TABLE "SENSOR_READINGS"
  ```

  See [Data Storage Overview](./overview#with-models-store-in) for the full STORE IN workflow.
</Info>

***

## Reading Data

EVENTs also support SELECT queries to read data back from the database. Publish a message to the event's `SOURCE_TOPIC`, and the query result is published to `DESTINATION_TOPIC`:

```lot theme={null}
DEFINE ROUTE SensorWarehouse WITH TYPE SNOWFLAKE
    ADD SNOWFLAKE_CONFIG
        WITH CONNECTION_STRING "account=myaccount;user=myuser;password=mypassword"
        WITH WAREHOUSE "COMPUTE_WH"
        WITH DATABASE "IOT_DATA"
        WITH SCHEMA "SENSORS"
    ADD EVENT GetLatestReading
        WITH SOURCE_TOPIC "db/query/latest"
        WITH DESTINATION_TOPIC "db/result/latest"
        WITH QUERY "SELECT SENSOR_ID, VALUE FROM SENSOR_READINGS WHERE SENSOR_ID = '{payload}' ORDER BY TIMESTAMP DESC LIMIT 1"
```

To trigger this query, publish the sensor ID to the source topic:

```
Topic:   db/query/latest
Payload: temp001
```

The query result is published to `db/result/latest`, where your actions or external clients can consume it.

### Timed Polling

EVENTs can also run SELECT queries on a fixed schedule using `WITH EVERY`:

```lot theme={null}
DEFINE ROUTE DeviceRegistry WITH TYPE SNOWFLAKE
    ADD SNOWFLAKE_CONFIG
        WITH CONNECTION_STRING "account=myaccount;user=iot_user;password=secure_password"
        WITH WAREHOUSE "IOT_WH"
        WITH DATABASE "IOT_DATA"
        WITH SCHEMA "DEVICES"
    ADD EVENT PollActiveDevices
        WITH EVERY 30 SECONDS
        WITH DESTINATION_TOPIC "dashboard/devices/active"
        WITH QUERY "SELECT DEVICE_ID, NAME, CURRENT_STATUS, LAST_SEEN FROM DEVICES WHERE ACTIVE = TRUE ORDER BY NAME"
```

***

## Table Schema Examples

```sql theme={null}
-- Sensor readings with VARIANT for JSON
CREATE TABLE SENSOR_READINGS (
    ID NUMBER AUTOINCREMENT,
    TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    SENSOR_ID VARCHAR(50),
    VALUE VARIANT
);

-- Event log
CREATE TABLE EVENT_LOG (
    ID NUMBER AUTOINCREMENT,
    TIMESTAMP TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    TOPIC VARCHAR(255),
    PAYLOAD VARIANT
);
```

***

## Connection String Format

```
account=<account_identifier>;user=<username>;password=<password>
```

| Component  | Description                                              |
| ---------- | -------------------------------------------------------- |
| `account`  | Snowflake account identifier (e.g., `xy12345.us-east-1`) |
| `user`     | Snowflake username                                       |
| `password` | User password                                            |

***

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connection Failed">
    * Verify account identifier is correct
    * Check username and password
    * Ensure warehouse exists and is running
    * Verify network allows Snowflake connections
  </Accordion>

  <Accordion title="Permission Errors">
    * Verify ROLE has INSERT permissions
    * Check user has access to WAREHOUSE
    * Ensure user can access DATABASE and SCHEMA
  </Accordion>

  <Accordion title="Query Timeout">
    * Increase QUERY\_TIMEOUT value
    * Check warehouse size is adequate
    * Verify warehouse is not suspended
  </Accordion>
</AccordionGroup>

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Data Storage Routes Overview" icon="database" href="./overview">
    Compare all storage options.
  </Card>

  <Card title="File Storage Route" icon="file" href="./file-storage">
    Configure local file storage.
  </Card>
</CardGroup>
