Skip to content

MongoDB Connector

Introduction

MongoDB is a widely used NoSQL database system designed for handling large-scale data with high flexibility and scalability. Known for its document-based model and JSON-like structure, MongoDB is ideal for modern applications requiring dynamic schemas and efficient data handling.

The MongoDB Connector enables seamless integration between MongoDB and MQTT-enabled devices or services. It allows real-time data interaction between MongoDB and an MQTT broker, making it possible to find, update, insert, replace or delete data using MQTT messages. This integration supports real-time data exchange, enhancing the capabilities of IoT applications by linking them directly with MongoDB.

Features and Benefits

  • Seamless Integration: : Enables smooth communication between MongoDB and MQTT clients, simplifying data exchange between platforms.
  • Real-time Operations:: Allows real-time execution of database operations (query, insert, update, delete) through MQTT messages for responsive data management.
  • Easy Integration: Simplifies the integration of MariaDB with your IoT projects, making it straightforward to connect database services with various IoT devices.
  • Flexible and Scalable: Leverages the dynamic schema and scalability of MongoDB to handle a wide variety of data and application requirements.
  • IoT-Ready: Simplifies the integration of MongoDB with IoT projects, enabling direct interaction between IoT devices and the database.
  • Dynamic Query Support: Facilitates the use of filters, updates, and queries via MQTT, offering flexibility and responsiveness for real-time data processing.

Prerequisites

Before you install and configure the connector, make sure you have the following prerequisites:

  • Basic understanding of the MQTT protocol.
  • Basic understanding of NOSQL and MongoDB operations.
  • Access to an MQTT broker compatible with your setup.
  • MQTT Client software (such as MQTT Explorer or similar).
  • Access to a MongoDB database instance.

Connector installation

Using Coreflux HUB

Please refer the to general docs for connector installation through the Coreflux HUB.

Using Coreflux HUBLESS

Connector management and control are conducted using the MQTT protocol. Commands (payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.

  1. Connect to the Coreflux MQTT Broker using your preferred client.
  2. Login using your coreflux account:
    -L myname@mydomain.com password`
    
  3. Check your available connectors:
    -l
    
  4. Install connector:
    -I coreflux_mongodb
    
  5. If there were no issues during the installation you should receive a message on the $SYS/Coreflux/Command/Output:
    Your coreflux_mongodb was installed with version <version> with the <asset_guid>. Let the magic begin!
    

Connector configuration

⚠ : If a parameter is not included in your connector's configuration and it is not required, a default value will be applied. Incorrect configuration may cause the connector to not work as intended.

ℹ : This configuration is presented using a hubless setup as an example. The key takeaway is the understanding of the parameters and their significance. If you are configuring the connector via Coreflux Hub, the same parameters and configurations apply.

MQTT Parameters

The MqttParameters in the JSON configuration define how to connect to an MQTT broker, specifying the communication details. The Address and Port indicate the broker's network location (in this case, 127.0.0.1 on port 1883), which is where the data will be sent to or received from. The parameters also detail authentication methods and the use of TLS for secure communication. This setup determines the pathway for data exchange between the device and the MQTT broker, facilitating the monitoring or control of device operations.

Parameter Description Required Example Default Value
Port Port number on which the MQTT broker is running. Yes 1883 1883
Address IP address or hostname of the MQTT broker. Yes "iot.coreflux.cloud" "127.0.0.1"
IsAnonymous Indicates if the connection is anonymous (no username/password required). No true true
Username Username for authentication, if not anonymous. No "" (empty string) "" (empty string)
Password Password for authentication, if not anonymous. No "" (empty string) "" (empty string)
WithTLS Specifies whether TLS encryption is enabled for secure communication. No true false
ClientId Used to uniquely to identify the client to the MQTT broker. Yes Client1 Random.Name.Generator

Example:

{
  "MqttParameters": {
    "Port": 1883,
    "Address": "iot.coreflux.cloud",
    "IsAnonymous": true,
    "Username": "",
    "Password": "",
    "WithTLS": false,
    "ClientId": "ClientId1"
  }
}

MongoDB Parameters

The MongoDB configuration parameters in the JSON setup define the communication between the connector and a MongoDB database. These parameters are critical for establishing a connection to the database, enabling the connector to perform operations such as querying, inserting, updating, replacing or deleting documents. The configuration includes specifying the database connection string, which contains the server address and port, as well as the database name to connect to. This ensures secure and efficient interaction with the MongoDB server.

Parameter Description Required Example Default Value
Connection String The URL of the MongoDB server, including the protocol, host, and port. Yes "mongodb://127.0.0.1:27017" "mongodb://127.0.0.1:27017"
DatabaseName The name of the database to connect to. This property must match an existing database name. Yes "example-database" "databaseName"

Example:

{
  "MqttParameters": {
    "Port": 1883,
    "Address": "iot.coreflux.cloud",
    "IsAnonymous": true,
    "Username": "",
    "Password": "",
    "WithTLS": false,
    "ClientId": "ClientId1"
  },
  "Properties": {
    "ConnectionString": "mongodb://127.0.0.1:27017",
    "DatabaseName": "example-database"
  }
}

Tags

Tags are integral elements that bridge the gap between MongoDB operations and MQTT communication. Each Tag defines a specific MongoDB operation and its associated MQTT topic, enabling seamless interaction between the database and MQTT clients. Tags configure MongoDB-specific parameters, including the query or filter to be applied, the sort order, and the feedback mechanism. They also define the behavior of the MQTT communication, such as topics and quality of service, ensuring real-time data exchange and operational flexibility.

Parameter Description Required Example Default Value
Name Unique Tag identifier. Yes "TemperatureData" "TagName"
Publish Defines how the data is published. Acceptable values are: 'Update' (0), 'Cyclic' (1), 'Once' (2). Yes 0 2
PublishCycle Interval for publishing messages when Publish is set to 'Cyclic'. Value in seconds, between 1 and 86,400. No 500 1
MqttTopic Unique MQTT topic for the operation. Yes "room/temp" "mqtt/topic"
MqttRetain Indicates whether the MQTT message should be retained. Yes true false
QualityOfService Defines the MQTT quality of service. Options: AtMostOnce (0), AtLeastOnce (1), ExactlyOnce (2). Yes 1 0
CollectionName Name of the MongoDB collection where the operation will be executed. Yes "users" "defaultCollection"
MqttFeedbackTopic MQTT topic to receive feedback from the MongoDB operation. Yes "feedback/temp" "feedback/topic"
Operation MongoDB operation to perform. Supported values: Find, Insert, Update, Replace, Delete. Check Operation Types Yes "Insert" "Find"
QueryFilter JSON string defining the MongoDB filter, query, or replacement document. Yes "{age: {$gte: 20}}" "{}"
OnlyOneDocumen If true, manipulates only the first matching document for *Update/Replace/Delete operations. Yes (for Update/Replace) true false
Sort JSON string defining the sort order for Find operations. Use 1 for ascending and -1 for descending. No (required for Find) "{age: -1}" "null"
Limit Maximum number of documents to return for Find operations. No (required for Find) 10 "null"
Count If true, only returns the count of matching documents for Find operations No (required for Find) true false

Example:

{
  "MqttParameters": {
    "Port": 1883,
    "Address": "iot.coreflux.cloud",
    "IsAnonymous": true,
    "Username": "",
    "Password": "",
    "WithTLS": false,
    "ClientId": "ClientId1"
  },
  "Properties": {
    "ConnectionString": "mongodb://127.0.0.1:27017",
    "DatabaseName": "example-database"
  },
  "Tags": [
    {
      "Name": "TemperatureData",
      "Publish": 2,
      "PublishCycle": 1,
      "MqttTopic": "room/temp",
      "MqttRetain": false,
      "QualityOfService": 1,
      "CollectionName": "sensors",
      "MqttFeedbackTopic": "feedback/temperature",
      "Operation": "Find",
      "QueryFilter": "{temperature: {$gte: 30}}",
      "Sort": "{timestamp: -1}",
      "Limit": 5,
      "Count": false
    }
  ]
}

Warning: Only one query can be executed per MQTT event. Ensure your SQL statements are designed accordingly to avoid unexpected behavior.

Data Insertion Guide

This Connector supports multiple data formats for inserting records into the data table: Raw, CSV, and JSON. Below is a quick reference guide on how to send data in each format.

Format QueryFilter Parameter Payload
Raw {value} {"temperature": {"$gte": 30}}
CSV {age: { $gte: {value.csv.age} }, hobbies: { $elemMatch: { $eq: {value.csv.hobby} } } } age, hobby
22, Running
JSON [{name: '{value.json.name}'},{$set: {age: {value.json.age}}},{upsert: true}] { "age": 25, "name": "John"}

Explanation

  • Raw: The value is passed directly into the QueryFilter Parameter.
  • CSV: Data is formatted as CSV, with field names and values separated by new lines.
  • JSON: Data is sent in JSON format, and the fields are referenced in the QueryFilter Parameter using dot notation.

Operation Types

Find

The Find operation retrieves documents from a MongoDB collection that match the criteria specified in the QueryFilter parameter.

Examples:

  • Retrieve Documents Matching a Condition in QueryFilter

QueryFilter:

{ "temperature": { "$gte": 25 } }

The payload does not influence the query in this case; the QueryFilter solely defines the criteria for retrieving documents.

  • Dynamic Query Using MQTT Payload

QueryFilter

{ "temperature": { "$gte": "{value.json.temperature}" } }

Payload

{ "temperature": 30 }

  • Sorted and Limited Query

QueryFilter

{ "status": "active" }

Sort

{ "timestamp": -1 }

Limit: 5

Result: Retrieves the five most recent active documents, sorted in descending order by timestamp.

Insert

The Insert operation adds one or more new documents to a MongoDB collection.

  • Insert single document

QueryFilter:

{ "name": "John", "age": 30 }

The payload does not influence the query in this case; the QueryFilter solely defines the criteria for retrieving documents.

  • Insert Multiple Documents

QueryFilter

[
  { "name": "{value.json.name}", "age": {value.json.age} },
  { "hobby": "{value.json.hobby}", "ocupation": "{value.json.ocupation}" }
]

Payload

[
  { "name": "Alice", "age": 25 },
  { "hobby": "running", "ocupation": "developer" }
]

Update

The Update operation modifies existing documents in a MongoDB collection.

  • Update a Single Document

QueryFilter

[
  { "name": "{value.json.name}" },
  { "$set": { "age": {value.json.age} } }
]

Payload

{ "name": "John", "age": 35 }

  • Dynamic Update with Upsert

QueryFilter

[
  { "name": "{value.json.name}" },
  { "$set": { "age": {value.json.age} } },
  { "upsert": true }
]

Payload

{ "name": "Alice", "age": 25 }

Result: Updates the age field of the document where name: Alice. If no such document exists, it inserts a new document.

Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.

Replace

The Replace operation replaces an entire document in a MongoDB collection.

  • Replace with Upsert

QueryFilter

[
  { "name": "{value.json.name}" },
  { "name": "{value.json.name}", "age": "{value.json.age}", "status": "active" },
  { "upsert": true }
]

Payload

{ "name": "Bob", "age": 28 }

Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.

Result: Replaces the document where name: Bob. If no such document exists, inserts a new one.

Delete

The Delete operation removes documents from a MongoDB collection.

  • Delete Document(s)

QueryFilter

{ "status": "{value.json.status}" }

Payload

{ "status": "inactive" }

Result: Deletes a document or all the documents depending the value of 'OnlyOneDocument'

Warning: Remember to use the 'OnlyOneDocument' flag to specify whether the operation should target a single document or multiple documents.

Saving the configuration

Connector management and control are conducted using the MQTT protocol. Commands (MQTT payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.

ℹ : Configuration requires the user to be logged in and have an connector installed. If these requirements are not met, please refer to the installation section.

  1. Save configuration:
    -assetConfigSave <asset_guid or assetName> <configuration>
    

    ℹ : The connector asset_guid can be obtained by consulting the $SYS/Coreflux/Assets topic.

Example:

-assetConfigSave connectorName {
  "MqttParameters": {
    "Port": 1883,
    "Address": "iot.coreflux.cloud",
    "IsAnonymous": true,
    "Username": "",
    "Password": "",
    "WithTLS": false,
    "ClientId": "ClientId1"
  },
  "Properties": {
    "ConnectionString": "mongodb://127.0.0.1:27017",
    "DatabaseName": "example-database"
  },
  "Tags": [
    {
      "Name": "TemperatureData",
      "Publish": 2,
      "PublishCycle": 1,
      "MqttTopic": "room/temp",
      "MqttRetain": false,
      "QualityOfService": 1,
      "CollectionName": "sensors",
      "MqttFeedbackTopic": "feedback/temperature",
      "Operation": "Find",
      "QueryFilter": "{temperature: {$gte: 30}}",
      "Sort": "{timestamp: -1}",
      "Limit": 5,
      "Count": false
    }
  ]
}
  1. Check if the configuration was saved (optional) :
    -assetConfigLoad <asset_guid>
    

ℹ : Find more detailed information about the connector management here.

Using the connector

⚠ Warning: It is important to understand that the flow of data between the broker and the device highly depends on the configuration of both the connector and the device. In case of unexpected results, please verify the configuration and/or check the logs for any possible errors.

With Coreflux HUB

Please refer to the general docs for connector installation through the Coreflux HUB.

With Coreflux HUBLESS

Connector management and control are conducted using the MQTT protocol. Commands (MQTT payload) are sent to the $SYS/Coreflux/Command topic. The results of these commands are published to $SYS/Coreflux/Command/Output. The following steps will focus solely on the payload that needs to be sent.

ℹ : The asset guid can be obtained by consulting the $SYS/Coreflux/Assets topic.

Run connector

  1. Run connector:
-R <asset_guid>

Stop connector

  1. Stop connector:
-S <asset_guid>

⚠ Warning: To update the connector configuration, ensure you stop it first to avoid issues. After making the changes, restart the connector to apply the new configuration.

Logs

Logs are essential for monitoring and debugging your system. They provide insight into the operation of your connectors and can help identify and resolve issues promptly.

With Coreflux HUB

On Coreflux HUB, the logs are displayed in the "Log" section of the connector configuration.

With MQTT Explorer

To display logs in MQTT Explorer, follow these steps:

  1. Publish the following command to the $SYS/Coreflux/Command topic:

-addTraceLog topic=topic/to/show/log level=Error/Information/Warning messageContains=(assetID)
2. The logs will be shown on the $SYS/Coreflux/Log/Traces/topic/to/show/log topic.

Example1:

To display informational logs, use:

-addTraceLog topic=log/inf level=Information

Note : This will display the Logs "Information" of all the connectors

Example2:

To display error logs, from a specific connector, use:

-addTraceLog topic=log/opcua/err level=Error messageContains=assetID

Note : This will display the Logs "Error" of a specific connector

Note: To display all three types of logs (Error, Information, and Warning), you will need to repeat this process three times, one for each log level. three times, one for each log level.