> ## Documentation Index
> Fetch the complete documentation index at: https://docs.coreflux.org/llms.txt
> Use this file to discover all available pages before exploring further.

# PostgreSQL Route

> Store MQTT data in PostgreSQL with automatic query execution and SSL support

## PostgreSQL Overview

The `POSTGRESQL` route stores MQTT messages in PostgreSQL databases. It supports SSL connections, parameterized queries, and automatic reconnection for reliable data persistence.

<Note>
  PostgreSQL is ideal for structured data that requires complex queries, joins, and ACID compliance. Use it when you need relational integrity and powerful SQL capabilities.
</Note>

## Basic Syntax

```lot theme={null}
DEFINE ROUTE SensorDB WITH TYPE POSTGRESQL
    ADD SQL_CONFIG
        WITH SERVER "postgres.example.com"
        WITH PORT '5432'
        WITH DATABASE "iot_data"
        WITH USERNAME "iot_user"
        WITH PASSWORD "secure_password"
    ADD EVENT StoreSensorReading
        WITH SOURCE_TOPIC "sensors/+/reading"
        WITH QUERY "INSERT INTO readings (ts, sensor_id, value) VALUES (NOW(), '{sensor_id}', '{value.json}')"
```

***

## Connection Configuration

### SQL\_CONFIG Parameters

<ParamField path="SERVER" type="string" required>
  PostgreSQL server hostname or IP address.
</ParamField>

<ParamField path="DATABASE" type="string" required>
  Target database name.
</ParamField>

<ParamField path="USERNAME" type="string" required>
  Database username.
</ParamField>

<ParamField path="PASSWORD" type="string" required>
  Database password.
</ParamField>

<AccordionGroup>
  <Accordion title="Optional Settings">
    <ParamField path="PORT" type="integer">
      PostgreSQL port. Default: 5432.
    </ParamField>
  </Accordion>

  <Accordion title="SSL/Security">
    <ParamField path="USE_SSL" type="boolean">
      Enable SSL connection. Default: false.
    </ParamField>

    <ParamField path="TRUST_SERVER_CERTIFICATE" type="boolean">
      Trust server certificate without validation. Default: false.
    </ParamField>
  </Accordion>

  <Accordion title="Timeouts">
    <ParamField path="CONNECTION_TIMEOUT" type="integer">
      Connection timeout in seconds. Default: 30.
    </ParamField>

    <ParamField path="COMMAND_TIMEOUT" type="integer">
      Command timeout in seconds. Default: 30.
    </ParamField>
  </Accordion>
</AccordionGroup>

<Warning>
  **Never hardcode credentials in production.** Use environment variables and encrypted secrets to keep sensitive values out of your route definitions:

  ```lot theme={null}
  WITH SERVER GET ENV "DB_HOST"
  WITH USERNAME GET ENV "DB_USER"
  WITH PASSWORD GET SECRET "DB_PASSWORD"
  ```

  See [Environment Variables & Secrets](/mqtt-broker/secrets-and-env) for setup and usage.
</Warning>

***

## Event Configuration

### EVENT Parameters

<ParamField path="SOURCE_TOPIC" type="string" required>
  MQTT topic pattern that triggers the INSERT. Supports wildcards.
</ParamField>

<ParamField path="QUERY" type="string" required>
  SQL INSERT statement with placeholders for dynamic values.
</ParamField>

<AccordionGroup>
  <Accordion title="Optional Settings">
    <ParamField path="DESTINATION_TOPIC" type="string">
      MQTT topic to publish query result status.
    </ParamField>
  </Accordion>
</AccordionGroup>

### Query Placeholders

| Placeholder          | Description                     |
| -------------------- | ------------------------------- |
| `{value.json}`       | Full JSON payload               |
| `{value.json.field}` | Specific field from JSON        |
| `{timestamp}`        | Message timestamp               |
| `{source_topic}`     | Original MQTT topic             |
| `{field}`            | Field extracted from topic path |

***

## Writing Data

<Tabs>
  <Tab title="Basic Sensor Storage">
    Store temperature readings with timestamps:

    ```lot theme={null}
    DEFINE ROUTE TemperatureDB WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        ADD EVENT StoreTemperature
            WITH SOURCE_TOPIC "sensors/+/temperature"
            WITH DESTINATION_TOPIC "db/status/temperature"
            WITH QUERY "INSERT INTO temperature_readings (recorded_at, sensor_id, value, unit) VALUES (NOW(), '{sensor_id}', {value.json.value}, '{value.json.unit}')"
    ```

    **Input at** `sensors/temp001/temperature`:

    ```json theme={null}
    {"value": 23.5, "unit": "celsius"}
    ```
  </Tab>

  <Tab title="With SSL">
    Secure connection with SSL:

    ```lot theme={null}
    DEFINE ROUTE SecureDB WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.cloud.com"
            WITH PORT '5432'
            WITH DATABASE "production"
            WITH USERNAME "app_user"
            WITH PASSWORD "secure_password"
            WITH USE_SSL "true"
            WITH TRUST_SERVER_CERTIFICATE "false"
        ADD EVENT StoreEvent
            WITH SOURCE_TOPIC "events/#"
            WITH QUERY "INSERT INTO events (ts, topic, payload) VALUES (NOW(), '{source_topic}', '{value.json}')"
    ```
  </Tab>

  <Tab title="Multiple Events">
    Different tables for different data types:

    ```lot theme={null}
    DEFINE ROUTE MultiTableDB WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        
        ADD EVENT StoreSensorData
            WITH SOURCE_TOPIC "sensors/+/data"
            WITH QUERY "INSERT INTO sensor_data (ts, sensor_id, value) VALUES (NOW(), '{sensor_id}', '{value.json}')"
        
        ADD EVENT StoreAlerts
            WITH SOURCE_TOPIC "alerts/+"
            WITH QUERY "INSERT INTO alerts (ts, alert_type, message) VALUES (NOW(), '{alert_type}', '{value.json}')"
        
        ADD EVENT StoreMetrics
            WITH SOURCE_TOPIC "metrics/#"
            WITH QUERY "INSERT INTO metrics (ts, topic, data) VALUES (NOW(), '{source_topic}', '{value.json}')"
    ```
  </Tab>

  <Tab title="JSON Column">
    Store full payload in JSONB column:

    ```lot theme={null}
    DEFINE ROUTE JSONStorage WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        ADD EVENT StoreJSON
            WITH SOURCE_TOPIC "devices/#"
            WITH QUERY "INSERT INTO device_data (ts, topic, payload) VALUES (NOW(), '{source_topic}', '{value.json}'::jsonb)"
    ```

    This allows powerful JSON queries in PostgreSQL:

    ```sql theme={null}
    SELECT * FROM device_data WHERE payload->>'temperature' > '25';
    ```
  </Tab>

  <Tab title="UPSERT">
    Insert a record, or update it if a conflict on the unique key occurs. Useful for device registration or status tables where duplicate messages are possible:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        ADD EVENT RegisterDevice
            WITH SOURCE_TOPIC "devices/+/registered"
            WITH QUERY "INSERT INTO devices (device_id, name, area, active, updated_at) VALUES ('{value.json.device_id}', '{value.json.name}', '{value.json.area}', TRUE, NOW()) ON CONFLICT (device_id) DO UPDATE SET name = EXCLUDED.name, area = EXCLUDED.area, updated_at = NOW()"
    ```

    `ON CONFLICT (device_id) DO UPDATE SET` updates only the changed columns when the device already exists — the row is never duplicated.
  </Tab>

  <Tab title="UPDATE">
    Modify existing records when a status change or update arrives:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        ADD EVENT UpdateDeviceStatus
            WITH SOURCE_TOPIC "devices/+/status"
            WITH QUERY "UPDATE devices SET current_status = '{value.json.status}', last_seen = NOW() WHERE device_id = '{value.json.device_id}'"
    ```

    Only rows matching the `WHERE` clause are modified. Unmatched messages produce no change — no error is raised.
  </Tab>

  <Tab title="DELETE">
    Remove records when a decommission or cleanup event arrives:

    ```lot theme={null}
    DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL
        ADD SQL_CONFIG
            WITH SERVER "postgres.example.com"
            WITH PORT '5432'
            WITH DATABASE "iot_data"
            WITH USERNAME "iot_user"
            WITH PASSWORD "secure_password"
        ADD EVENT DecommissionDevice
            WITH SOURCE_TOPIC "devices/+/decommissioned"
            WITH QUERY "DELETE FROM devices WHERE device_id = '{value.json.device_id}' AND active = FALSE"
    ```

    The `AND active = FALSE` guard prevents accidental deletes if a message arrives before the device is properly deactivated.
  </Tab>
</Tabs>

<Info>
  **Alternative: STORE IN with Models** — Instead of writing EVENT queries, you can bind a [model](/lot-language/models/overview) directly to this route. Every `PUBLISH MODEL` call automatically inserts a row — no query needed:

  ```lot theme={null}
  DEFINE MODEL SensorReading
      ADD STRING "sensor_id"
      ADD DOUBLE "value"
      STORE IN "SensorDB"
          WITH TABLE "sensor_readings"
  ```

  See [Data Storage Overview](./overview#with-models-store-in) for the full STORE IN workflow.
</Info>

***

## Reading Data

EVENTs also support SELECT queries to read data back from the database. Publish a message to the event's `SOURCE_TOPIC`, and the query result is published to `DESTINATION_TOPIC`:

```lot theme={null}
DEFINE ROUTE SensorDB WITH TYPE POSTGRESQL
    ADD SQL_CONFIG
        WITH SERVER "postgres.example.com"
        WITH PORT '5432'
        WITH DATABASE "iot_data"
        WITH USERNAME "iot_user"
        WITH PASSWORD "secure_password"
    ADD EVENT GetLatestReading
        WITH SOURCE_TOPIC "db/query/latest"
        WITH DESTINATION_TOPIC "db/result/latest"
        WITH QUERY "SELECT sensor_id, value FROM readings WHERE sensor_id = '{payload}' ORDER BY ts DESC LIMIT 1"
```

To trigger this query, publish the sensor ID to the source topic:

```
Topic:   db/query/latest
Payload: temp001
```

The query result is published to `db/result/latest`, where your actions or external clients can consume it.

### Timed Polling

EVENTs can also run SELECT queries on a schedule instead of waiting for an MQTT trigger. Use `WITH EVERY` to publish fresh data at a fixed interval:

```lot theme={null}
DEFINE ROUTE DeviceRegistry WITH TYPE POSTGRESQL
    ADD SQL_CONFIG
        WITH SERVER "postgres.example.com"
        WITH PORT '5432'
        WITH DATABASE "iot_data"
        WITH USERNAME "iot_user"
        WITH PASSWORD "secure_password"
    ADD EVENT PollActiveDevices
        WITH EVERY 10 SECONDS
        WITH DESTINATION_TOPIC "dashboard/devices/active"
        WITH QUERY "SELECT device_id, name, current_status, last_seen FROM devices WHERE active = TRUE ORDER BY name"
```

The query result is published to `dashboard/devices/active` every 10 seconds — no trigger message needed.

***

## Table Schema Examples

Create tables to match your data:

```sql theme={null}
-- Basic sensor readings
CREATE TABLE sensor_readings (
    id SERIAL PRIMARY KEY,
    recorded_at TIMESTAMP DEFAULT NOW(),
    sensor_id VARCHAR(50),
    value DOUBLE PRECISION,
    unit VARCHAR(20)
);

-- Full JSON storage
CREATE TABLE device_data (
    id SERIAL PRIMARY KEY,
    ts TIMESTAMP DEFAULT NOW(),
    topic VARCHAR(255),
    payload JSONB
);

-- Alerts table
CREATE TABLE alerts (
    id SERIAL PRIMARY KEY,
    ts TIMESTAMP DEFAULT NOW(),
    alert_type VARCHAR(50),
    message TEXT
);
```

***

## Troubleshooting

<AccordionGroup>
  <Accordion title="Connection Refused">
    * Verify SERVER and PORT are correct
    * Check PostgreSQL is running and accepting connections
    * Verify firewall allows connections on port 5432
    * Check `pg_hba.conf` allows your IP address
  </Accordion>

  <Accordion title="Authentication Failed">
    * Verify USERNAME and PASSWORD are correct
    * Check user has permissions on the DATABASE
    * Verify authentication method in `pg_hba.conf`
  </Accordion>

  <Accordion title="SSL Connection Errors">
    * Ensure USE\_SSL matches server requirements
    * For self-signed certs, set TRUST\_SERVER\_CERTIFICATE "true"
    * Verify SSL is enabled on PostgreSQL server
  </Accordion>

  <Accordion title="Query Syntax Errors">
    * Check placeholder names match topic structure
    * Verify table and column names exist
    * Test query manually in psql first
    * Ensure proper escaping for special characters
  </Accordion>

  <Accordion title="Timeout Errors">
    * Increase CONNECTION\_TIMEOUT for slow networks
    * Increase COMMAND\_TIMEOUT for complex queries
    * Check server load and query performance
  </Accordion>
</AccordionGroup>

***

## Next Steps

<CardGroup cols={2}>
  <Card title="Data Storage Routes Overview" icon="database" href="./overview">
    Compare all storage options.
  </Card>

  <Card title="MySQL Route" icon="database" href="./mysql">
    Configure MySQL database storage.
  </Card>
</CardGroup>
