MongoDB Connector
Introduction
MongoDB is a widely used NoSQL database system designed for handling large-scale data with high flexibility and scalability. Known for its document-based model and JSON-like structure, MongoDB is ideal for modern applications requiring dynamic schemas and efficient data handling.
The MongoDB Connector enables seamless integration between MongoDB and MQTT-enabled devices or services. It allows real-time data interaction between MongoDB and an MQTT broker, making it possible to find, update, insert, replace or delete data using MQTT messages. This integration supports real-time data exchange, enhancing the capabilities of IoT applications by linking them directly with MongoDB.
Features and Benefits
- Seamless Integration: : Enables smooth communication between MongoDB and MQTT clients, simplifying data exchange between platforms.
- Real-time Operations:: Allows real-time execution of database operations (query, insert, update, delete) through MQTT messages for responsive data management.
- Easy Integration: Simplifies the integration of MariaDB with your IoT projects, making it straightforward to connect database services with various IoT devices.
- Flexible and Scalable: Leverages the dynamic schema and scalability of MongoDB to handle a wide variety of data and application requirements.
- IoT-Ready: Simplifies the integration of MongoDB with IoT projects, enabling direct interaction between IoT devices and the database.
- Dynamic Query Support: Facilitates the use of filters, updates, and queries via MQTT, offering flexibility and responsiveness for real-time data processing.
Prerequisites
Before you install and configure the connector, make sure you have the following prerequisites:
- Basic understanding of the MQTT protocol.
- Basic understanding of NOSQL and MongoDB operations.
- Access to an MQTT broker compatible with your setup.
- MQTT Client software (such as MQTT Explorer or similar).
- Access to a MongoDB database instance.
Connector installation
Using Coreflux HUB
Please refer the to general docs for connector installation through the Coreflux HUB.
Using Coreflux HUBLESS
Connector management and control are conducted using the MQTT protocol. Commands (payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.
- Connect to the Coreflux MQTT Broker using your preferred client.
- Login using your coreflux account:
- Check your available connectors:
- Install connector:
- If there were no issues during the installation you should receive a message on the $SYS/Coreflux/Command/Output:
Connector configuration
⚠ : If a parameter is not included in your connector's configuration and it is not required, a default value will be applied. Incorrect configuration may cause the connector to not work as intended.
ℹ : This configuration is presented using a hubless setup as an example. The key takeaway is the understanding of the parameters and their significance. If you are configuring the connector via Coreflux Hub, the same parameters and configurations apply.
MQTT Parameters
The MqttParameters in the JSON configuration define how to connect to an MQTT broker, specifying the communication details. The Address and Port indicate the broker's network location (in this case, 127.0.0.1 on port 1883), which is where the data will be sent to or received from. The parameters also detail authentication methods and the use of TLS for secure communication. This setup determines the pathway for data exchange between the device and the MQTT broker, facilitating the monitoring or control of device operations.
Parameter | Description | Required | Example | Default Value |
---|---|---|---|---|
Port | Port number on which the MQTT broker is running. | Yes | 1883 |
1883 |
Address | IP address or hostname of the MQTT broker. | Yes | "iot.coreflux.cloud" |
"127.0.0.1" |
IsAnonymous | Indicates if the connection is anonymous (no username/password required). | No | true |
true |
Username | Username for authentication, if not anonymous. | No | "" (empty string) |
"" (empty string) |
Password | Password for authentication, if not anonymous. | No | "" (empty string) |
"" (empty string) |
WithTLS | Specifies whether TLS encryption is enabled for secure communication. | No | true |
false |
ClientId | Used to uniquely to identify the client to the MQTT broker. | Yes | Client1 |
Random.Name.Generator |
Example:
{
"MqttParameters": {
"Port": 1883,
"Address": "iot.coreflux.cloud",
"IsAnonymous": true,
"Username": "",
"Password": "",
"WithTLS": false,
"ClientId": "ClientId1"
}
}
MongoDB Parameters
The MongoDB configuration parameters in the JSON setup define the communication between the connector and a MongoDB database. These parameters are critical for establishing a connection to the database, enabling the connector to perform operations such as querying, inserting, updating, replacing or deleting documents. The configuration includes specifying the database connection string, which contains the server address and port, as well as the database name to connect to. This ensures secure and efficient interaction with the MongoDB server.
Parameter | Description | Required | Example | Default Value |
---|---|---|---|---|
Connection String | The URL of the MongoDB server, including the protocol, host, and port. | Yes | "mongodb://127.0.0.1:27017" |
"mongodb://127.0.0.1:27017" |
DatabaseName | The name of the database to connect to. This property must match an existing database name. | Yes | "example-database" |
"databaseName" |
Example:
{
"MqttParameters": {
"Port": 1883,
"Address": "iot.coreflux.cloud",
"IsAnonymous": true,
"Username": "",
"Password": "",
"WithTLS": false,
"ClientId": "ClientId1"
},
"Properties": {
"ConnectionString": "mongodb://127.0.0.1:27017",
"DatabaseName": "example-database"
}
}
Tags
Tags are integral elements that bridge the gap between MongoDB operations and MQTT communication. Each Tag defines a specific MongoDB operation and its associated MQTT topic, enabling seamless interaction between the database and MQTT clients. Tags configure MongoDB-specific parameters, including the query or filter to be applied, the sort order, and the feedback mechanism. They also define the behavior of the MQTT communication, such as topics and quality of service, ensuring real-time data exchange and operational flexibility.
Parameter | Description | Required | Example | Default Value |
---|---|---|---|---|
Name | Unique Tag identifier. | Yes | "TemperatureData" |
"TagName" |
Publish | Defines how the data is published. Acceptable values are: 'Update' (0), 'Cyclic' (1), 'Once' (2). | Yes | 0 |
2 |
PublishCycle | Interval for publishing messages when Publish is set to 'Cyclic'. Value in seconds, between 1 and 86,400. | No | 500 |
1 |
MqttTopic | Unique MQTT topic for the operation. | Yes | "room/temp" |
"mqtt/topic" |
MqttRetain | Indicates whether the MQTT message should be retained. | Yes | true |
false |
QualityOfService | Defines the MQTT quality of service. Options: AtMostOnce (0), AtLeastOnce (1), ExactlyOnce (2). | Yes | 1 |
0 |
CollectionName | Name of the MongoDB collection where the operation will be executed. | Yes | "users" |
"defaultCollection" |
MqttFeedbackTopic | MQTT topic to receive feedback from the MongoDB operation. | Yes | "feedback/temp" |
"feedback/topic" |
Operation | MongoDB operation to perform. Supported values: Find, Insert, Update, Replace, Delete. Check Operation Types | Yes | "Insert" |
"Find" |
QueryFilter | JSON string defining the MongoDB filter, query, or replacement document. | Yes | "{age: {$gte: 20}}" |
"{}" |
OnlyOneDocumen | If true, manipulates only the first matching document for *Update/Replace/Delete operations. | Yes (for Update/Replace) | true |
false |
Sort | JSON string defining the sort order for Find operations. Use 1 for ascending and -1 for descending. |
No (required for Find) | "{age: -1}" |
"null" |
Limit | Maximum number of documents to return for Find operations. | No (required for Find) | 10 |
"null" |
Count | If true, only returns the count of matching documents for Find operations | No (required for Find) | true |
false |
Example:
{
"MqttParameters": {
"Port": 1883,
"Address": "iot.coreflux.cloud",
"IsAnonymous": true,
"Username": "",
"Password": "",
"WithTLS": false,
"ClientId": "ClientId1"
},
"Properties": {
"ConnectionString": "mongodb://127.0.0.1:27017",
"DatabaseName": "example-database"
},
"Tags": [
{
"Name": "TemperatureData",
"Publish": 2,
"PublishCycle": 1,
"MqttTopic": "room/temp",
"MqttRetain": false,
"QualityOfService": 1,
"CollectionName": "sensors",
"MqttFeedbackTopic": "feedback/temperature",
"Operation": "Find",
"QueryFilter": "{temperature: {$gte: 30}}",
"Sort": "{timestamp: -1}",
"Limit": 5,
"Count": false
}
]
}
⚠ Warning: Only one query can be executed per MQTT event. Ensure your SQL statements are designed accordingly to avoid unexpected behavior.
Data Insertion Guide
This Connector supports multiple data formats for inserting records into the data table: Raw, CSV, and JSON. Below is a quick reference guide on how to send data in each format.
Format | QueryFilter Parameter | Payload |
---|---|---|
Raw | {value} |
{"temperature": {"$gte": 30}} |
CSV | {age: { $gte: {value.csv.age} }, hobbies: { $elemMatch: { $eq: {value.csv.hobby} } } } |
age, hobby 22, Running |
JSON | [{name: '{value.json.name}'},{$set: {age: {value.json.age}}},{upsert: true}] |
{ "age": 25, "name": "John"} |
Explanation
- Raw: The value is passed directly into the QueryFilter Parameter.
- CSV: Data is formatted as CSV, with field names and values separated by new lines.
- JSON: Data is sent in JSON format, and the fields are referenced in the QueryFilter Parameter using dot notation.
Operation Types
Find
The Find operation retrieves documents from a MongoDB collection that match the criteria specified in the QueryFilter parameter.
Examples:
- Retrieve Documents Matching a Condition in QueryFilter
QueryFilter:
The payload does not influence the query in this case; the QueryFilter solely defines the criteria for retrieving documents.
- Dynamic Query Using MQTT Payload
QueryFilter
Payload
- Sorted and Limited Query
QueryFilter
Sort
Limit: 5
Result: Retrieves the five most recent active documents, sorted in descending order by timestamp.
Insert
The Insert operation adds one or more new documents to a MongoDB collection.
- Insert single document
QueryFilter:
The payload does not influence the query in this case; the QueryFilter solely defines the criteria for retrieving documents.
- Insert Multiple Documents
QueryFilter
[
{ "name": "{value.json.name}", "age": {value.json.age} },
{ "hobby": "{value.json.hobby}", "ocupation": "{value.json.ocupation}" }
]
Payload
Update
The Update operation modifies existing documents in a MongoDB collection.
- Update a Single Document
QueryFilter
Payload
- Dynamic Update with Upsert
QueryFilter
Payload
Result: Updates the age field of the document where name: Alice. If no such document exists, it inserts a new document.
⚠ Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.
Replace
The Replace operation replaces an entire document in a MongoDB collection.
- Replace with Upsert
QueryFilter
[
{ "name": "{value.json.name}" },
{ "name": "{value.json.name}", "age": "{value.json.age}", "status": "active" },
{ "upsert": true }
]
Payload
⚠ Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.
Result: Replaces the document where name: Bob. If no such document exists, inserts a new one.
Delete
The Delete operation removes documents from a MongoDB collection.
- Delete Document(s)
QueryFilter
Payload
Result: Deletes a document or all the documents depending the value of 'OnlyOneDocument'
⚠ Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.
Saving the configuration
Connector management and control are conducted using the MQTT protocol. Commands (MQTT payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.
ℹ : Configuration requires the user to be logged in and have an connector installed. If these requirements are not met, please refer to the installation section.
- Save configuration:
ℹ : The connector asset_guid can be obtained by consulting the $SYS/Coreflux/Assets topic.
Example:
-assetConfigSave connectorName {
"MqttParameters": {
"Port": 1883,
"Address": "iot.coreflux.cloud",
"IsAnonymous": true,
"Username": "",
"Password": "",
"WithTLS": false,
"ClientId": "ClientId1"
},
"Properties": {
"ConnectionString": "mongodb://127.0.0.1:27017",
"DatabaseName": "example-database"
},
"Tags": [
{
"Name": "TemperatureData",
"Publish": 2,
"PublishCycle": 1,
"MqttTopic": "room/temp",
"MqttRetain": false,
"QualityOfService": 1,
"CollectionName": "sensors",
"MqttFeedbackTopic": "feedback/temperature",
"Operation": "Find",
"QueryFilter": "{temperature: {$gte: 30}}",
"Sort": "{timestamp: -1}",
"Limit": 5,
"Count": false
}
]
}
- Check if the configuration was saved (optional) :
ℹ : Find more detailed information about the connector management here.
Using the connector
⚠ Warning: It is important to understand that the flow of data between the broker and the device highly depends on the configuration of both the connector and the device. In case of unexpected results, please verify the configuration and/or check the logs for any possible errors.
With Coreflux HUB
Please refer to the general docs for connector installation through the Coreflux HUB.
With Coreflux HUBLESS
Connector management and control are conducted using the MQTT protocol. Commands (MQTT payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.
ℹ : The asset guid can be obtained by consulting the $SYS/Coreflux/Assets topic.
Run connector
- Run connector:
Stop connector
- Stop connector:
⚠ Warning: To update the connector configuration, ensure you stop it first to avoid issues. After making the changes, restart the connector to apply the new configuration.
Logs
Logs are essential for monitoring and debugging your system. They provide insight into the operation of your connectors and can help identify and resolve issues promptly.
With Coreflux HUB
On Coreflux HUB, the logs are displayed in the "Log" section of the connector configuration.
With MQTT Explorer
To display logs in MQTT Explorer, follow these steps:
- Publish the following command to the $SYS/Coreflux/Command topic:
Example1:
To display informational
logs, use:
Note : This will display the Logs "Information" of
all the connectors
Example2:
To display error logs, from a specific connector, use:
Note : This will display the Logs "Error" of a
specific connector
Note: To display all three types of logs (Error, Information, and Warning), you will need to repeat this process three times, one for each log level. three times, one for each log level.