Python Integration Examples
This document provides comprehensive examples of Python integration with Coreflux, demonstrating various use cases and best practices for version 1.7.2+.
Table of Contents
- Basic Examples
- Data Processing Examples
- IoT Sensor Examples
- Web API Integration
- Machine Learning Examples
- Error Handling Patterns
- Complete Workflow Examples
Basic Examples
Calculator Script
# Script Name: Calculator
def add(a, b):
"""Add two numbers"""
return a + b
def subtract(a, b):
"""Subtract b from a"""
return a - b
def multiply(a, b):
"""Multiply two numbers"""
return a * b
def divide(a, b):
"""Divide a by b with error handling"""
if b == 0:
return {"error": "Division by zero", "result": None}
return {"error": None, "result": a / b}
def power(base, exponent):
"""Calculate base raised to the power of exponent"""
return base ** exponent
def safe_operation(operation, a, b):
"""Safely perform an operation with error handling"""
try:
if operation == "add":
return add(a, b)
elif operation == "subtract":
return subtract(a, b)
elif operation == "multiply":
return multiply(a, b)
elif operation == "divide":
return divide(a, b)
elif operation == "power":
return power(a, b)
else:
return {"error": f"Unknown operation: {operation}"}
except Exception as e:
return {"error": f"Operation failed: {str(e)}"}
Corresponding LOT Action:
DEFINE ACTION MathProcessor
ON TOPIC "math/operation" DO
SET operation_data GET TOPIC "math/operation" AS JSON
CALL PYTHON "Calculator.safe_operation"
WITH ({operation_data.operation}, {operation_data.a}, {operation_data.b})
RETURN AS {result}
PUBLISH TOPIC "math/result" WITH {result}
Text Processing Script
# Script Name: TextProcessor
import re
import json
def reverse_text(text):
"""Reverse the input text"""
return text[::-1]
def count_words(text):
"""Count words in text"""
return len(text.split())
def count_characters(text):
"""Count characters in text"""
return len(text)
def to_uppercase(text):
"""Convert text to uppercase"""
return text.upper()
def to_lowercase(text):
"""Convert text to lowercase"""
return text.lower()
def extract_numbers(text):
"""Extract all numbers from text"""
numbers = re.findall(r'-?\d+\.?\d*', text)
return [float(num) if '.' in num else int(num) for num in numbers]
def clean_text(text):
"""Clean text by removing extra whitespace and special characters"""
# Remove extra whitespace
cleaned = re.sub(r'\s+', ' ', text.strip())
# Remove special characters except alphanumeric and spaces
cleaned = re.sub(r'[^\w\s]', '', cleaned)
return cleaned
def text_analysis(text):
"""Comprehensive text analysis"""
return {
"original": text,
"length": len(text),
"word_count": count_words(text),
"uppercase": to_uppercase(text),
"lowercase": to_lowercase(text),
"cleaned": clean_text(text),
"numbers": extract_numbers(text)
}
Corresponding LOT Action:
DEFINE ACTION TextAnalyzer
ON TOPIC "text/analyze" DO
SET input_text GET TOPIC "text/analyze" AS STRING
CALL PYTHON "TextProcessor.text_analysis"
WITH ({input_text})
RETURN AS {analysis}
PUBLISH TOPIC "text/result" WITH {analysis}
Data Processing Examples
JSON Data Processor
# Script Name: DataProcessor
import json
from datetime import datetime
def validate_json(data):
"""Validate if data is proper JSON"""
try:
if isinstance(data, str):
json.loads(data)
return {"valid": True, "error": None}
except json.JSONDecodeError as e:
return {"valid": False, "error": f"JSON decode error: {str(e)}"}
except Exception as e:
return {"valid": False, "error": f"Validation error: {str(e)}"}
def flatten_json(data, parent_key='', sep='_'):
"""Flatten nested JSON structure"""
items = []
if isinstance(data, dict):
for k, v in data.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_json(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
else:
items.append((parent_key, data))
return dict(items)
def extract_values_by_key(data, key):
"""Extract all values for a specific key from nested JSON"""
values = []
if isinstance(data, dict):
for k, v in data.items():
if k == key:
values.append(v)
elif isinstance(v, (dict, list)):
values.extend(extract_values_by_key(v, key))
elif isinstance(data, list):
for item in data:
values.extend(extract_values_by_key(item, key))
return values
def add_timestamp(data):
"""Add current timestamp to data"""
if isinstance(data, dict):
data["timestamp"] = datetime.now().isoformat()
return data
def process_sensor_data(data):
"""Process IoT sensor data with validation and enhancement"""
try:
# Validate JSON
validation = validate_json(data)
if not validation["valid"]:
return {"error": validation["error"], "processed": None}
# Parse JSON if string
if isinstance(data, str):
parsed_data = json.loads(data)
else:
parsed_data = data
# Add timestamp
enhanced_data = add_timestamp(parsed_data)
# Extract sensor readings
sensor_values = extract_values_by_key(enhanced_data, "value")
return {
"error": None,
"processed": enhanced_data,
"sensor_count": len(sensor_values),
"values": sensor_values,
"flattened": flatten_json(enhanced_data)
}
except Exception as e:
return {"error": f"Processing failed: {str(e)}", "processed": None}
Corresponding LOT Action:
DEFINE ACTION SensorDataProcessor
ON TOPIC "sensors/data" DO
SET sensor_data GET TOPIC "sensors/data" AS JSON
CALL PYTHON "DataProcessor.process_sensor_data"
WITH ({sensor_data})
RETURN AS {result}
IF {result.error} == None THEN
PUBLISH TOPIC "sensors/processed" WITH {result.processed}
PUBLISH TOPIC "sensors/flattened" WITH {result.flattened}
ELSE
PUBLISH TOPIC "sensors/error" WITH {result.error}
IoT Sensor Examples
Temperature Sensor Processor
# Script Name: TemperatureProcessor
import math
def celsius_to_fahrenheit(celsius):
"""Convert Celsius to Fahrenheit"""
return (celsius * 9/5) + 32
def fahrenheit_to_celsius(fahrenheit):
"""Convert Fahrenheit to Celsius"""
return (fahrenheit - 32) * 5/9
def kelvin_to_celsius(kelvin):
"""Convert Kelvin to Celsius"""
return kelvin - 273.15
def celsius_to_kelvin(celsius):
"""Convert Celsius to Kelvin"""
return celsius + 273.15
def temperature_analysis(temperature, unit="celsius"):
"""Comprehensive temperature analysis"""
try:
# Convert to Celsius for analysis
if unit.lower() == "fahrenheit":
celsius = fahrenheit_to_celsius(temperature)
elif unit.lower() == "kelvin":
celsius = kelvin_to_celsius(temperature)
else:
celsius = temperature
# Determine comfort level
if celsius < 0:
comfort = "freezing"
elif celsius < 10:
comfort = "cold"
elif celsius < 20:
comfort = "cool"
elif celsius < 25:
comfort = "comfortable"
elif celsius < 30:
comfort = "warm"
else:
comfort = "hot"
# Check for anomalies
anomaly = celsius < -50 or celsius > 100
return {
"celsius": celsius,
"fahrenheit": celsius_to_fahrenheit(celsius),
"kelvin": celsius_to_kelvin(celsius),
"comfort_level": comfort,
"anomaly": anomaly,
"unit_original": unit
}
except Exception as e:
return {"error": f"Temperature analysis failed: {str(e)}"}
def temperature_trend(temperatures):
"""Analyze temperature trend from a list of readings"""
if not temperatures or len(temperatures) < 2:
return {"error": "Need at least 2 temperature readings"}
try:
# Calculate trend
first = temperatures[0]
last = temperatures[-1]
change = last - first
change_percent = (change / first) * 100 if first != 0 else 0
# Determine trend direction
if change > 1:
trend = "rising"
elif change < -1:
trend = "falling"
else:
trend = "stable"
# Calculate statistics
avg_temp = sum(temperatures) / len(temperatures)
min_temp = min(temperatures)
max_temp = max(temperatures)
return {
"trend": trend,
"change": change,
"change_percent": change_percent,
"average": avg_temp,
"minimum": min_temp,
"maximum": max_temp,
"readings_count": len(temperatures)
}
except Exception as e:
return {"error": f"Trend analysis failed: {str(e)}"}
Corresponding LOT Action:
DEFINE ACTION TemperatureAnalyzer
ON TOPIC "temperature/reading" DO
SET temp_data GET TOPIC "temperature/reading" AS JSON
CALL PYTHON "TemperatureProcessor.temperature_analysis"
WITH ({temp_data.value}, {temp_data.unit})
RETURN AS {analysis}
PUBLISH TOPIC "temperature/analysis" WITH {analysis}
Humidity Sensor Processor
# Script Name: HumidityProcessor
def humidity_analysis(humidity):
"""Analyze humidity level and provide recommendations"""
try:
if humidity < 0 or humidity > 100:
return {"error": "Humidity must be between 0 and 100"}
# Determine comfort level
if humidity < 30:
level = "low"
recommendation = "Consider using a humidifier"
elif humidity < 40:
level = "dry"
recommendation = "Slightly dry, monitor for comfort"
elif humidity < 60:
level = "comfortable"
recommendation = "Ideal humidity level"
elif humidity < 70:
level = "moderate"
recommendation = "Acceptable humidity level"
else:
level = "high"
recommendation = "Consider using a dehumidifier"
# Health implications
health_notes = []
if humidity < 30:
health_notes.append("May cause dry skin and respiratory irritation")
elif humidity > 70:
health_notes.append("May promote mold growth and dust mites")
return {
"humidity": humidity,
"level": level,
"recommendation": recommendation,
"health_notes": health_notes,
"comfortable": 40 <= humidity <= 60
}
except Exception as e:
return {"error": f"Humidity analysis failed: {str(e)}"}
def calculate_dew_point(temperature, humidity):
"""Calculate dew point from temperature and humidity"""
try:
# Magnus formula for dew point calculation
a = 17.27
b = 237.7
alpha = ((a * temperature) / (b + temperature)) + math.log(humidity / 100.0)
dew_point = (b * alpha) / (a - alpha)
return {
"dew_point": dew_point,
"temperature": temperature,
"humidity": humidity,
"comfortable": abs(temperature - dew_point) > 3 # Comfortable if temp > dew point + 3°C
}
except Exception as e:
return {"error": f"Dew point calculation failed: {str(e)}"}
Web API Integration
Weather API Client
# Script Name: WeatherAPI
import requests
import json
def get_weather(city, api_key):
"""Get weather data for a city using OpenWeatherMap API"""
try:
url = f"http://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": api_key,
"units": "metric"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
# Extract relevant information
weather_info = {
"city": data["name"],
"country": data["sys"]["country"],
"temperature": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"humidity": data["main"]["humidity"],
"pressure": data["main"]["pressure"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"],
"visibility": data.get("visibility", 0) / 1000, # Convert to km
"timestamp": data["dt"]
}
return {"error": None, "weather": weather_info}
except requests.exceptions.RequestException as e:
return {"error": f"API request failed: {str(e)}", "weather": None}
except KeyError as e:
return {"error": f"Unexpected API response format: {str(e)}", "weather": None}
except Exception as e:
return {"error": f"Weather API error: {str(e)}", "weather": None}
def format_weather_alert(weather_data):
"""Format weather data into human-readable alert"""
if weather_data.get("error"):
return f"Weather alert error: {weather_data['error']}"
weather = weather_data["weather"]
alert = f"Weather in {weather['city']}, {weather['country']}: "
alert += f"{weather['description']}, "
alert += f"Temperature: {weather['temperature']}°C "
alert += f"(feels like {weather['feels_like']}°C), "
alert += f"Humidity: {weather['humidity']}%, "
alert += f"Wind: {weather['wind_speed']} m/s"
return alert
Corresponding LOT Action:
DEFINE ACTION WeatherMonitor
ON TOPIC "weather/request" DO
SET request_data GET TOPIC "weather/request" AS JSON
CALL PYTHON "WeatherAPI.get_weather"
WITH ({request_data.city}, {request_data.api_key})
RETURN AS {weather_result}
IF {weather_result.error} == None THEN
CALL PYTHON "WeatherAPI.format_weather_alert"
WITH ({weather_result})
RETURN AS {alert}
PUBLISH TOPIC "weather/data" WITH {weather_result.weather}
PUBLISH TOPIC "weather/alert" WITH {alert}
ELSE
PUBLISH TOPIC "weather/error" WITH {weather_result.error}
Machine Learning Examples
Simple Prediction Model
# Script Name: MLPredictor
import math
def linear_regression_predict(x, slope, intercept):
"""Simple linear regression prediction"""
return slope * x + intercept
def moving_average(data, window_size):
"""Calculate moving average of data"""
if len(data) < window_size:
return data
result = []
for i in range(len(data) - window_size + 1):
window = data[i:i + window_size]
avg = sum(window) / window_size
result.append(avg)
return result
def detect_anomaly(value, historical_data, threshold=2):
"""Detect if a value is an anomaly based on historical data"""
if not historical_data:
return {"is_anomaly": False, "reason": "No historical data"}
mean = sum(historical_data) / len(historical_data)
variance = sum((x - mean) ** 2 for x in historical_data) / len(historical_data)
std_dev = math.sqrt(variance)
z_score = abs(value - mean) / std_dev if std_dev > 0 else 0
return {
"is_anomaly": z_score > threshold,
"z_score": z_score,
"mean": mean,
"std_dev": std_dev,
"threshold": threshold
}
def predict_trend(data, future_periods=5):
"""Predict future trend based on historical data"""
if len(data) < 2:
return {"error": "Need at least 2 data points"}
# Simple linear trend calculation
n = len(data)
x_sum = sum(range(n))
y_sum = sum(data)
xy_sum = sum(i * data[i] for i in range(n))
x2_sum = sum(i * i for i in range(n))
slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum * x_sum)
intercept = (y_sum - slope * x_sum) / n
# Generate predictions
predictions = []
for i in range(n, n + future_periods):
pred = linear_regression_predict(i, slope, intercept)
predictions.append(pred)
return {
"slope": slope,
"intercept": intercept,
"predictions": predictions,
"trend_direction": "increasing" if slope > 0 else "decreasing" if slope < 0 else "stable"
}
Corresponding LOT Action:
DEFINE ACTION MLProcessor
ON TOPIC "ml/predict" DO
SET ml_data GET TOPIC "ml/predict" AS JSON
CALL PYTHON "MLPredictor.detect_anomaly"
WITH ({ml_data.value}, {ml_data.historical_data}, {ml_data.threshold})
RETURN AS {anomaly_result}
CALL PYTHON "MLPredictor.predict_trend"
WITH ({ml_data.historical_data}, 5)
RETURN AS {trend_result}
PUBLISH TOPIC "ml/anomaly" WITH {anomaly_result}
PUBLISH TOPIC "ml/trend" WITH {trend_result}
Error Handling Patterns
Robust Error Handler
# Script Name: ErrorHandler
import traceback
import sys
def safe_execute(func, *args, **kwargs):
"""Safely execute a function with comprehensive error handling"""
try:
result = func(*args, **kwargs)
return {
"success": True,
"result": result,
"error": None,
"error_type": None
}
except ValueError as e:
return {
"success": False,
"result": None,
"error": f"Value error: {str(e)}",
"error_type": "ValueError"
}
except TypeError as e:
return {
"success": False,
"result": None,
"error": f"Type error: {str(e)}",
"error_type": "TypeError"
}
except ZeroDivisionError as e:
return {
"success": False,
"result": None,
"error": f"Division by zero: {str(e)}",
"error_type": "ZeroDivisionError"
}
except Exception as e:
return {
"success": False,
"result": None,
"error": f"Unexpected error: {str(e)}",
"error_type": type(e).__name__
}
def validate_input(data, expected_type, required_fields=None):
"""Validate input data structure and types"""
try:
if not isinstance(data, expected_type):
return {
"valid": False,
"error": f"Expected {expected_type.__name__}, got {type(data).__name__}"
}
if required_fields and isinstance(data, dict):
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {
"valid": False,
"error": f"Missing required fields: {missing_fields}"
}
return {"valid": True, "error": None}
except Exception as e:
return {
"valid": False,
"error": f"Validation error: {str(e)}"
}
def retry_operation(func, max_retries=3, delay=1):
"""Retry an operation with exponential backoff"""
import time
for attempt in range(max_retries):
try:
result = func()
return {"success": True, "result": result, "attempts": attempt + 1}
except Exception as e:
if attempt == max_retries - 1:
return {
"success": False,
"error": f"Failed after {max_retries} attempts: {str(e)}",
"attempts": attempt + 1
}
time.sleep(delay * (2 ** attempt)) # Exponential backoff
return {"success": False, "error": "Retry limit exceeded", "attempts": max_retries}
Complete Workflow Examples
IoT Data Pipeline
# Script Name: IoTDataPipeline
import json
from datetime import datetime
def process_sensor_pipeline(sensor_data):
"""Complete IoT sensor data processing pipeline"""
pipeline_result = {
"timestamp": datetime.now().isoformat(),
"stages": [],
"final_result": None,
"errors": []
}
try:
# Stage 1: Validation
validation = validate_sensor_data(sensor_data)
pipeline_result["stages"].append({"stage": "validation", "success": validation["valid"]})
if not validation["valid"]:
pipeline_result["errors"].append(f"Validation failed: {validation['error']}")
return pipeline_result
# Stage 2: Data cleaning
cleaned_data = clean_sensor_data(sensor_data)
pipeline_result["stages"].append({"stage": "cleaning", "success": True})
# Stage 3: Analysis
analysis = analyze_sensor_data(cleaned_data)
pipeline_result["stages"].append({"stage": "analysis", "success": True})
# Stage 4: Anomaly detection
anomaly = detect_anomalies(cleaned_data, analysis)
pipeline_result["stages"].append({"stage": "anomaly_detection", "success": True})
# Stage 5: Generate insights
insights = generate_insights(cleaned_data, analysis, anomaly)
pipeline_result["stages"].append({"stage": "insights", "success": True})
# Final result
pipeline_result["final_result"] = {
"cleaned_data": cleaned_data,
"analysis": analysis,
"anomaly": anomaly,
"insights": insights
}
except Exception as e:
pipeline_result["errors"].append(f"Pipeline error: {str(e)}")
return pipeline_result
def validate_sensor_data(data):
"""Validate sensor data structure"""
required_fields = ["sensor_id", "timestamp", "value", "unit"]
if not isinstance(data, dict):
return {"valid": False, "error": "Data must be a dictionary"}
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"valid": False, "error": f"Missing fields: {missing_fields}"}
if not isinstance(data["value"], (int, float)):
return {"valid": False, "error": "Value must be numeric"}
return {"valid": True, "error": None}
def clean_sensor_data(data):
"""Clean and normalize sensor data"""
cleaned = data.copy()
# Normalize timestamp
if isinstance(cleaned["timestamp"], str):
# Assume ISO format, keep as is
pass
# Round value to 2 decimal places
cleaned["value"] = round(float(cleaned["value"]), 2)
# Normalize unit to lowercase
cleaned["unit"] = cleaned["unit"].lower()
return cleaned
def analyze_sensor_data(data):
"""Analyze sensor data for patterns and statistics"""
value = data["value"]
unit = data["unit"]
# Basic analysis based on unit
if unit in ["celsius", "fahrenheit"]:
return analyze_temperature(value, unit)
elif unit == "percent":
return analyze_humidity(value)
elif unit in ["pascal", "bar", "psi"]:
return analyze_pressure(value, unit)
else:
return {"type": "generic", "value": value, "unit": unit}
def analyze_temperature(value, unit):
"""Analyze temperature data"""
if unit == "fahrenheit":
celsius = (value - 32) * 5/9
else:
celsius = value
return {
"type": "temperature",
"celsius": celsius,
"fahrenheit": celsius * 9/5 + 32,
"comfort_level": get_temperature_comfort(celsius)
}
def analyze_humidity(value):
"""Analyze humidity data"""
return {
"type": "humidity",
"value": value,
"comfort_level": get_humidity_comfort(value)
}
def analyze_pressure(value, unit):
"""Analyze pressure data"""
# Convert to Pascal for analysis
if unit == "bar":
pascal = value * 100000
elif unit == "psi":
pascal = value * 6894.76
else:
pascal = value
return {
"type": "pressure",
"pascal": pascal,
"bar": pascal / 100000,
"psi": pascal / 6894.76
}
def get_temperature_comfort(celsius):
"""Get comfort level for temperature"""
if celsius < 0:
return "freezing"
elif celsius < 10:
return "cold"
elif celsius < 20:
return "cool"
elif celsius < 25:
return "comfortable"
elif celsius < 30:
return "warm"
else:
return "hot"
def get_humidity_comfort(humidity):
"""Get comfort level for humidity"""
if humidity < 30:
return "low"
elif humidity < 60:
return "comfortable"
else:
return "high"
def detect_anomalies(data, analysis):
"""Detect anomalies in sensor data"""
value = data["value"]
unit = data["unit"]
anomalies = []
# Temperature anomalies
if unit in ["celsius", "fahrenheit"]:
celsius = analysis["celsius"]
if celsius < -50 or celsius > 100:
anomalies.append("Temperature out of normal range")
# Humidity anomalies
elif unit == "percent":
if value < 0 or value > 100:
anomalies.append("Humidity out of valid range")
return {
"has_anomalies": len(anomalies) > 0,
"anomalies": anomalies
}
def generate_insights(data, analysis, anomaly):
"""Generate actionable insights from sensor data"""
insights = []
# Temperature insights
if analysis["type"] == "temperature":
comfort = analysis["comfort_level"]
if comfort in ["cold", "freezing"]:
insights.append("Consider heating or insulation")
elif comfort in ["warm", "hot"]:
insights.append("Consider cooling or ventilation")
# Humidity insights
elif analysis["type"] == "humidity":
comfort = analysis["comfort_level"]
if comfort == "low":
insights.append("Consider using a humidifier")
elif comfort == "high":
insights.append("Consider using a dehumidifier")
# Anomaly insights
if anomaly["has_anomalies"]:
insights.append("Sensor may need calibration or maintenance")
return {
"insights": insights,
"recommendations": insights,
"priority": "high" if anomaly["has_anomalies"] else "normal"
}
Corresponding LOT Action:
DEFINE ACTION IoTDataPipeline
ON TOPIC "sensors/raw" DO
SET raw_data GET TOPIC "sensors/raw" AS JSON
CALL PYTHON "IoTDataPipeline.process_sensor_pipeline"
WITH ({raw_data})
RETURN AS {pipeline_result}
IF {pipeline_result.final_result} != None THEN
PUBLISH TOPIC "sensors/processed" WITH {pipeline_result.final_result.cleaned_data}
PUBLISH TOPIC "sensors/analysis" WITH {pipeline_result.final_result.analysis}
PUBLISH TOPIC "sensors/insights" WITH {pipeline_result.final_result.insights}
IF {pipeline_result.final_result.anomaly.has_anomalies} == True THEN
PUBLISH TOPIC "sensors/alerts" WITH {pipeline_result.final_result.anomaly}
END
ELSE
PUBLISH TOPIC "sensors/errors" WITH {pipeline_result.errors}
END
Best Practices Summary
- Always include Script Name: Every Python script must start with
# Script Name: YourScriptName
- Handle errors gracefully: Use try-catch blocks and return structured error information
- Validate inputs: Check data types and required fields before processing
- Use descriptive function names: Make your code self-documenting
- Return structured data: Use dictionaries for complex return values
- Keep functions focused: Each function should have a single responsibility
- Test incrementally: Test each function individually before integrating with LOT actions
- Document your functions: Use docstrings to explain what each function does
- Consider performance: Keep functions efficient for real-time IoT applications
- Use appropriate data types: Ensure return values are JSON-serializable
These examples demonstrate the power and flexibility of Python integration with Coreflux, enabling you to build sophisticated IoT data processing pipelines directly within the MQTT broker environment.