Using Python With Coreflux
Coreflux is a versatile IoT platform that provides a comprehensive ecosystem for managing and monitoring your data hub. Its flexibility extends to various programming languages, including Python, allowing developers to integrate machine learning models, sensors, and other components seamlessly.
Starting with version 1.7.2, Coreflux introduces native Python integration directly within the MQTT broker, enabling you to execute Python functions from LOT (Language of Things) actions without external dependencies. This documentation covers both the new native integration and traditional external Python integration approaches.
Native Python Integration (Version 1.7.2+)
The new native Python integration allows you to execute Python functions directly from LOT actions, making Python a first-class citizen in the Coreflux ecosystem. This integration works seamlessly with the MQTT broker and provides a unified development experience.
Key Features
- Direct Python Execution: Call Python functions directly from LOT actions
- Script Management: Add, remove, and manage Python scripts via MQTT commands
- Package Management: Install Python packages through the broker
- VS Code Integration: Full support for Python development in VS Code LOT notebooks
- Real-time Execution: Python functions execute in real-time within the broker context
Python Script Structure
All Python scripts must start with a # Script Name:
comment to be recognized by the system:
# Script Name: MyScript
def my_function(param1, param2):
return param1 + param2
def another_function():
return "Hello from Python!"
LOT Action Integration
Use the CALL PYTHON
syntax in LOT actions to execute Python functions:
DEFINE ACTION PythonExample
ON TOPIC "python/test" DO
CALL PYTHON "MyScript.my_function"
WITH (5, 3)
RETURN AS {result}
PUBLISH TOPIC "python/result" WITH {result}
MQTT Commands for Python
Command | Description | Format | Since Version |
---|---|---|---|
-addPython |
Adds a Python script | -addPython scriptContent |
1.7.2 |
-removePython |
Removes a Python script | -removePython scriptName |
1.7.2 |
-installPythonPackage |
Installs a Python package | -installPythonPackage packageName |
1.7.2 |
-listPythonPackages |
Lists installed packages | -listPythonPackages |
1.7.2 |
Example: Complete Python Integration
-
Add a Python script:
-
Create a LOT action that uses Python:
-
Test the integration:
Advanced Python Features
Working with External Libraries
You can install and use external Python packages in your scripts:
# Install a package
-installPythonPackage requests
-installPythonPackage numpy
-installPythonPackage pandas
# Script Name: WebAPI
import requests
import json
def get_weather(city):
url = f"http://api.openweathermap.org/data/2.5/weather?q={city}"
try:
response = requests.get(url, timeout=10)
return response.json()
except Exception as e:
return {"error": str(e)}
def post_data(url, data):
try:
response = requests.post(url, json=data, timeout=10)
return {"status": response.status_code, "data": response.json()}
except Exception as e:
return {"error": str(e)}
Error Handling
Python functions should handle errors gracefully as they execute within the broker context:
# Script Name: SafeProcessor
def safe_divide(a, b):
try:
if b == 0:
return {"error": "Division by zero", "result": None}
return {"error": None, "result": a / b}
except Exception as e:
return {"error": str(e), "result": None}
def process_data(data):
try:
if isinstance(data, str):
return {"processed": data.upper(), "length": len(data)}
return {"error": "Expected string input"}
except Exception as e:
return {"error": f"Processing failed: {str(e)}"}
Data Type Handling
The system automatically handles data type conversions between LOT and Python:
# Script Name: TypeConverter
def convert_types(data):
# Input can be string, number, boolean, or JSON
if isinstance(data, str):
return {"type": "string", "value": data, "length": len(data)}
elif isinstance(data, (int, float)):
return {"type": "number", "value": data, "squared": data ** 2}
elif isinstance(data, bool):
return {"type": "boolean", "value": data, "opposite": not data}
elif isinstance(data, dict):
return {"type": "object", "keys": list(data.keys()), "count": len(data)}
else:
return {"type": "unknown", "value": str(data)}
VS Code LOT Notebook Integration
The VS Code LOT extension provides full support for Python development:
- Create a new LOT notebook (
.lot
file) - Add Python cells with the
# Script Name:
comment - Use the play button to upload Python scripts to the broker
- Test integration with LOT actions
Example VS Code LOT notebook structure:
# Python Cell
# Script Name: TextProcessor
def reverse_text(text):
return text[::-1]
def count_words(text):
return len(text.split())
def to_uppercase(text):
return text.upper()
# LOT Action Cell
DEFINE ACTION TextReverser
ON TOPIC "text/input" DO
SET message GET TOPIC "text/input" AS STRING
CALL PYTHON "TextProcessor.reverse_text"
WITH ({message})
RETURN AS {reversed_text}
PUBLISH TOPIC "text/reversed" WITH {reversed_text}
System Integration
Python scripts run within the Coreflux broker's Python.NET runtime, providing:
- Memory Management: Automatic garbage collection and memory optimization
- Thread Safety: Safe execution in multi-threaded broker environment
- Resource Limits: Built-in resource management and timeout handling
- Logging Integration: Python output integrates with Coreflux logging system
Best Practices
- Always include Script Name: Every Python script must start with
# Script Name: YourScriptName
- Handle Errors Gracefully: Use try-catch blocks to prevent broker crashes
- Keep Functions Simple: Complex logic should be broken into smaller functions
- Use Descriptive Names: Make function and variable names clear and meaningful
- Test Incrementally: Test each function individually before integrating with LOT actions
Traditional External Python Integration
For more complex scenarios or when you need full Python environment control, you can still use the traditional approach of external Python applications that communicate with Coreflux via MQTT.
Setting Up the Environment
Before diving into the integration, ensure you have the necessary Python libraries installed:
Installing and Configuring a Flux Asset
In the example we are using , an S7-1200 PLC with Coreflux,then you need to install the coreflux_S7mqtt
asset. This can be done by sending a command to the $SYS/Coreflux/Command
MQTT topic:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
client.publish("$SYS/Coreflux/Command", "-I coreflux_S7mqtt")
client = mqtt.Client()
client.on_connect = on_connect
client.connect("YOUR_COREFLUX_SERVER_IP", 1883, 60)
client.loop_forever()
After installing the asset, you can configure it using the provided asset configuration example. First, retrieve the configuration example using the assetHelp
command:
Once you have the configuration example, modify the JSON as required for your setup. For instance, you might need to set the IP address of your S7-1200 PLC, specify the MQTT broker details, and define the tags for the PLC variables you want to monitor or control.
After modifying the configuration JSON, save the configuration using the assetConfigSave
command:
asset_guid = "YOUR_ASSET_GUID" # Replace with the actual GUID of your asset container
config_json = "YOUR_MODIFIED_JSON" # Replace with your modified configuration JSON
client.publish("$SYS/Coreflux/Command", f"-assetConfigSave {asset_guid} {config_json}")
Finally, start the asset:
Integrating an ML Model for a Temperature Sensor
Let's assume you have a simple ML model that predicts whether a given temperature reading indicates an anomaly. This model can be integrated with the temperature sensor data from the S7-1200 PLC.
import paho.mqtt.client as mqtt
from sklearn.externals import joblib
# Load your ML model
model = joblib.load('path_to_your_model.pkl')
def on_message(client, userdata, msg):
temperature = float(msg.payload) # Assuming the payload is the temperature value from MD3000
prediction = model.predict([[temperature]])
if prediction[0] == 1: # Assuming 1 indicates an anomaly
print(f"Anomaly detected at temperature: {temperature}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("YOUR_COREFLUX_SERVER_IP", 1883, 60)
client.subscribe("path_to_your_S7_topic/MD3000") # Replace with the appropriate topic for MD3000
client.loop_forever()
In this example, the Python script subscribes to the MQTT topic corresponding to the MD3000
memory location in the S7-1200 PLC. When a new temperature reading is received, it's passed to the ML model to check for anomalies.