PostgreSQL Route
Feature | Since Version | Notes |
---|---|---|
PostgreSQL Route | >v1.6.3 | Persists MQTT data to PostgreSQL databases. |
1. Overview
The PostgreSQL Route implementation in the Coreflux MQTT broker provides a powerful and flexible way to store data from MQTT topics directly into a PostgreSQL database.
For practical examples of LOT Routes, check out the Routes examples in the LOT Samples Repository. This route offers robust configuration options for connection handling, security, and data storage. It can automatically create tables and map data types, simplifying the integration between your MQTT data streams and your relational database.
2. Key Features
- Native PostgreSQL Support: Optimized for PostgreSQL databases.
- Flexible Connection Configuration: Connect using a full connection string or individual parameters.
- Secure Communication: Supports TLS/SSL connections to encrypt data in transit.
- Automatic Table Creation: Automatically creates tables based on the incoming data structure or a predefined model if they do not exist.
- Intelligent Schema Handling: Dynamically maps JSON data types to appropriate SQL column types.
- Data Caching: Temporarily caches data during connection interruptions to prevent data loss.
3. Route Configuration
A route can be configured using the addRoute
command with TYPE POSTGRESQL
.
Example: PostgreSQL Route Configuration
DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL
ADD SQL_CONFIG
WITH SERVER "db-postgresql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
Adding the Route via MQTT Command
To deploy this route to the broker, use the -addRoute
command by publishing to the $SYS/Coreflux/Command
topic:
-addRoute DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL
ADD SQL_CONFIG
WITH SERVER "db-postgresql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
For more information about broker commands, see the MQTT Broker Commands documentation.
Deploying Routes with LOT Notebooks
Routes can also be created and deployed interactively using LOT Notebooks in Visual Studio Code. This provides a convenient way to develop, test, and document your routes.
To get started with LOT Notebooks:
1. Install the LOT Notebooks extension in VS Code
2. Create a new .lotnb
file
3. Add code cells with your route definitions
4. Execute the cells to deploy directly to your broker
For detailed instructions, see the LOT Notebooks Getting Started Guide.
4. Data Handling and Reliability
The SQL routes are designed with reliability in mind to handle common network issues.
Connection Testing
Upon startup or restart, the broker proactively tests the database connection by executing a simple query (SELECT 1
). This ensures the connection is valid and ready to accept data before the route is fully activated.
Data Caching
If the connection to the database is lost, the route automatically caches incoming data. - Up to 100 messages are stored in memory. - When the connection is re-established, the cached data is flushed to the database. - If the cache limit is exceeded, the oldest messages are discarded to make room for new ones. This mechanism is designed to handle temporary downtimes, not to serve as a long-term data buffer.
5. Using Models for Structured Data Storage
While you can route raw data, the true power of SQL routes is unlocked when combined with LOT Models. Models act as a schema and transformation layer, converting raw, unstructured MQTT messages into a consistent, structured format suitable for a relational database.
In essence, a Model defines what the data should look like, and the STORE IN
command within the model definition tells the broker where to persist that structured information using a specific route.
Automatic Table Creation
A key feature of this integration is the broker's ability to manage your database schema dynamically. When a Model directs data to a table: 1. The broker first performs an efficient, cached check to see if the table already exists. 2. If the table does not exist, it is automatically created based on the fields defined in the Model.
This process includes adding an id
column (auto-incrementing primary key) and a timestamp
column for you, alongside columns that match the fields from your model.
Example: Storing Structured Sensor Data
Let's consider a practical example where we want to process and store sensor data.
1. The Model Definition
First, we define a model to structure the incoming data. This model listens for raw data on a topic, adds a timestamp, and prepares a structured payload.
DEFINE MODEL MyModel COLLAPSED WITH TOPIC "MyModel"
ADD "f_event_time" WITH TIMESTAMP "UNIX"
ADD BOOL"bool" WITH TRUE
ADD "data" WITH TOPIC "mapping/mydata" AS TRIGGER
STORE IN "PostgreSQL_Log"
WITH TABLE "MyModelTable"
DEFINE MODEL MyModel...
: Creates a new model.
- ADD "data" WITH TOPIC "mapping/mydata" AS TRIGGER
: This is the trigger. Whenever a message arrives on mapping/mydata
, the model executes.
- ADD "f_event_time" ...
and ADD BOOL"bool" ...
: These lines enrich the original payload with a Unix timestamp and a static boolean value.
- STORE IN "PostgreSQL_Log" ...
: This is the crucial step. It instructs the broker to take the resulting structured data and send it to the PostgreSQL_Log
route for storage in the MyModelTable
table.
2. The Generated SQL Query
When the model is triggered by a message on mapping/mydata
, it constructs the data and the route generates an INSERT
statement to store it. For example:
INSERT INTO "MyModelTable" (f_event_time, bool, data_mydata) VALUES (1678886400, true, '{"value": 42}');