Skip to content

Python Integration Examples

This document provides comprehensive examples of Python integration with Coreflux, demonstrating various use cases and best practices for version 1.7.2+.

Table of Contents

Basic Examples

Calculator Script

# Script Name: Calculator
def add(a, b):
    """Add two numbers"""
    return a + b

def subtract(a, b):
    """Subtract b from a"""
    return a - b

def multiply(a, b):
    """Multiply two numbers"""
    return a * b

def divide(a, b):
    """Divide a by b with error handling"""
    if b == 0:
        return {"error": "Division by zero", "result": None}
    return {"error": None, "result": a / b}

def power(base, exponent):
    """Calculate base raised to the power of exponent"""
    return base ** exponent

def safe_operation(operation, a, b):
    """Safely perform an operation with error handling"""
    try:
        if operation == "add":
            return add(a, b)
        elif operation == "subtract":
            return subtract(a, b)
        elif operation == "multiply":
            return multiply(a, b)
        elif operation == "divide":
            return divide(a, b)
        elif operation == "power":
            return power(a, b)
        else:
            return {"error": f"Unknown operation: {operation}"}
    except Exception as e:
        return {"error": f"Operation failed: {str(e)}"}

Corresponding LOT Action:

DEFINE ACTION MathProcessor
ON TOPIC "math/operation" DO
    SET operation_data GET TOPIC "math/operation" AS JSON
    CALL PYTHON "Calculator.safe_operation"
        WITH ({operation_data.operation}, {operation_data.a}, {operation_data.b})
        RETURN AS {result}
    PUBLISH TOPIC "math/result" WITH {result}

Text Processing Script

# Script Name: TextProcessor
import re
import json

def reverse_text(text):
    """Reverse the input text"""
    return text[::-1]

def count_words(text):
    """Count words in text"""
    return len(text.split())

def count_characters(text):
    """Count characters in text"""
    return len(text)

def to_uppercase(text):
    """Convert text to uppercase"""
    return text.upper()

def to_lowercase(text):
    """Convert text to lowercase"""
    return text.lower()

def extract_numbers(text):
    """Extract all numbers from text"""
    numbers = re.findall(r'-?\d+\.?\d*', text)
    return [float(num) if '.' in num else int(num) for num in numbers]

def clean_text(text):
    """Clean text by removing extra whitespace and special characters"""
    # Remove extra whitespace
    cleaned = re.sub(r'\s+', ' ', text.strip())
    # Remove special characters except alphanumeric and spaces
    cleaned = re.sub(r'[^\w\s]', '', cleaned)
    return cleaned

def text_analysis(text):
    """Comprehensive text analysis"""
    return {
        "original": text,
        "length": len(text),
        "word_count": count_words(text),
        "uppercase": to_uppercase(text),
        "lowercase": to_lowercase(text),
        "cleaned": clean_text(text),
        "numbers": extract_numbers(text)
    }

Corresponding LOT Action:

DEFINE ACTION TextAnalyzer
ON TOPIC "text/analyze" DO
    SET input_text GET TOPIC "text/analyze" AS STRING
    CALL PYTHON "TextProcessor.text_analysis"
        WITH ({input_text})
        RETURN AS {analysis}
    PUBLISH TOPIC "text/result" WITH {analysis}

Data Processing Examples

JSON Data Processor

# Script Name: DataProcessor
import json
from datetime import datetime

def validate_json(data):
    """Validate if data is proper JSON"""
    try:
        if isinstance(data, str):
            json.loads(data)
        return {"valid": True, "error": None}
    except json.JSONDecodeError as e:
        return {"valid": False, "error": f"JSON decode error: {str(e)}"}
    except Exception as e:
        return {"valid": False, "error": f"Validation error: {str(e)}"}

def flatten_json(data, parent_key='', sep='_'):
    """Flatten nested JSON structure"""
    items = []
    if isinstance(data, dict):
        for k, v in data.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(flatten_json(v, new_key, sep=sep).items())
            else:
                items.append((new_key, v))
    else:
        items.append((parent_key, data))
    return dict(items)

def extract_values_by_key(data, key):
    """Extract all values for a specific key from nested JSON"""
    values = []
    if isinstance(data, dict):
        for k, v in data.items():
            if k == key:
                values.append(v)
            elif isinstance(v, (dict, list)):
                values.extend(extract_values_by_key(v, key))
    elif isinstance(data, list):
        for item in data:
            values.extend(extract_values_by_key(item, key))
    return values

def add_timestamp(data):
    """Add current timestamp to data"""
    if isinstance(data, dict):
        data["timestamp"] = datetime.now().isoformat()
    return data

def process_sensor_data(data):
    """Process IoT sensor data with validation and enhancement"""
    try:
        # Validate JSON
        validation = validate_json(data)
        if not validation["valid"]:
            return {"error": validation["error"], "processed": None}

        # Parse JSON if string
        if isinstance(data, str):
            parsed_data = json.loads(data)
        else:
            parsed_data = data

        # Add timestamp
        enhanced_data = add_timestamp(parsed_data)

        # Extract sensor readings
        sensor_values = extract_values_by_key(enhanced_data, "value")

        return {
            "error": None,
            "processed": enhanced_data,
            "sensor_count": len(sensor_values),
            "values": sensor_values,
            "flattened": flatten_json(enhanced_data)
        }
    except Exception as e:
        return {"error": f"Processing failed: {str(e)}", "processed": None}

Corresponding LOT Action:

DEFINE ACTION SensorDataProcessor
ON TOPIC "sensors/data" DO
    SET sensor_data GET TOPIC "sensors/data" AS JSON
    CALL PYTHON "DataProcessor.process_sensor_data"
        WITH ({sensor_data})
        RETURN AS {result}
    IF {result.error} == None THEN
        PUBLISH TOPIC "sensors/processed" WITH {result.processed}
        PUBLISH TOPIC "sensors/flattened" WITH {result.flattened}
    ELSE
        PUBLISH TOPIC "sensors/error" WITH {result.error}

IoT Sensor Examples

Temperature Sensor Processor

# Script Name: TemperatureProcessor
import math

def celsius_to_fahrenheit(celsius):
    """Convert Celsius to Fahrenheit"""
    return (celsius * 9/5) + 32

def fahrenheit_to_celsius(fahrenheit):
    """Convert Fahrenheit to Celsius"""
    return (fahrenheit - 32) * 5/9

def kelvin_to_celsius(kelvin):
    """Convert Kelvin to Celsius"""
    return kelvin - 273.15

def celsius_to_kelvin(celsius):
    """Convert Celsius to Kelvin"""
    return celsius + 273.15

def temperature_analysis(temperature, unit="celsius"):
    """Comprehensive temperature analysis"""
    try:
        # Convert to Celsius for analysis
        if unit.lower() == "fahrenheit":
            celsius = fahrenheit_to_celsius(temperature)
        elif unit.lower() == "kelvin":
            celsius = kelvin_to_celsius(temperature)
        else:
            celsius = temperature

        # Determine comfort level
        if celsius < 0:
            comfort = "freezing"
        elif celsius < 10:
            comfort = "cold"
        elif celsius < 20:
            comfort = "cool"
        elif celsius < 25:
            comfort = "comfortable"
        elif celsius < 30:
            comfort = "warm"
        else:
            comfort = "hot"

        # Check for anomalies
        anomaly = celsius < -50 or celsius > 100

        return {
            "celsius": celsius,
            "fahrenheit": celsius_to_fahrenheit(celsius),
            "kelvin": celsius_to_kelvin(celsius),
            "comfort_level": comfort,
            "anomaly": anomaly,
            "unit_original": unit
        }
    except Exception as e:
        return {"error": f"Temperature analysis failed: {str(e)}"}

def temperature_trend(temperatures):
    """Analyze temperature trend from a list of readings"""
    if not temperatures or len(temperatures) < 2:
        return {"error": "Need at least 2 temperature readings"}

    try:
        # Calculate trend
        first = temperatures[0]
        last = temperatures[-1]
        change = last - first
        change_percent = (change / first) * 100 if first != 0 else 0

        # Determine trend direction
        if change > 1:
            trend = "rising"
        elif change < -1:
            trend = "falling"
        else:
            trend = "stable"

        # Calculate statistics
        avg_temp = sum(temperatures) / len(temperatures)
        min_temp = min(temperatures)
        max_temp = max(temperatures)

        return {
            "trend": trend,
            "change": change,
            "change_percent": change_percent,
            "average": avg_temp,
            "minimum": min_temp,
            "maximum": max_temp,
            "readings_count": len(temperatures)
        }
    except Exception as e:
        return {"error": f"Trend analysis failed: {str(e)}"}

Corresponding LOT Action:

DEFINE ACTION TemperatureAnalyzer
ON TOPIC "temperature/reading" DO
    SET temp_data GET TOPIC "temperature/reading" AS JSON
    CALL PYTHON "TemperatureProcessor.temperature_analysis"
        WITH ({temp_data.value}, {temp_data.unit})
        RETURN AS {analysis}
    PUBLISH TOPIC "temperature/analysis" WITH {analysis}

Humidity Sensor Processor

# Script Name: HumidityProcessor

def humidity_analysis(humidity):
    """Analyze humidity level and provide recommendations"""
    try:
        if humidity < 0 or humidity > 100:
            return {"error": "Humidity must be between 0 and 100"}

        # Determine comfort level
        if humidity < 30:
            level = "low"
            recommendation = "Consider using a humidifier"
        elif humidity < 40:
            level = "dry"
            recommendation = "Slightly dry, monitor for comfort"
        elif humidity < 60:
            level = "comfortable"
            recommendation = "Ideal humidity level"
        elif humidity < 70:
            level = "moderate"
            recommendation = "Acceptable humidity level"
        else:
            level = "high"
            recommendation = "Consider using a dehumidifier"

        # Health implications
        health_notes = []
        if humidity < 30:
            health_notes.append("May cause dry skin and respiratory irritation")
        elif humidity > 70:
            health_notes.append("May promote mold growth and dust mites")

        return {
            "humidity": humidity,
            "level": level,
            "recommendation": recommendation,
            "health_notes": health_notes,
            "comfortable": 40 <= humidity <= 60
        }
    except Exception as e:
        return {"error": f"Humidity analysis failed: {str(e)}"}

def calculate_dew_point(temperature, humidity):
    """Calculate dew point from temperature and humidity"""
    try:
        # Magnus formula for dew point calculation
        a = 17.27
        b = 237.7
        alpha = ((a * temperature) / (b + temperature)) + math.log(humidity / 100.0)
        dew_point = (b * alpha) / (a - alpha)

        return {
            "dew_point": dew_point,
            "temperature": temperature,
            "humidity": humidity,
            "comfortable": abs(temperature - dew_point) > 3  # Comfortable if temp > dew point + 3°C
        }
    except Exception as e:
        return {"error": f"Dew point calculation failed: {str(e)}"}

Web API Integration

Weather API Client

# Script Name: WeatherAPI
import requests
import json

def get_weather(city, api_key):
    """Get weather data for a city using OpenWeatherMap API"""
    try:
        url = f"http://api.openweathermap.org/data/2.5/weather"
        params = {
            "q": city,
            "appid": api_key,
            "units": "metric"
        }

        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()

        data = response.json()

        # Extract relevant information
        weather_info = {
            "city": data["name"],
            "country": data["sys"]["country"],
            "temperature": data["main"]["temp"],
            "feels_like": data["main"]["feels_like"],
            "humidity": data["main"]["humidity"],
            "pressure": data["main"]["pressure"],
            "description": data["weather"][0]["description"],
            "wind_speed": data["wind"]["speed"],
            "visibility": data.get("visibility", 0) / 1000,  # Convert to km
            "timestamp": data["dt"]
        }

        return {"error": None, "weather": weather_info}

    except requests.exceptions.RequestException as e:
        return {"error": f"API request failed: {str(e)}", "weather": None}
    except KeyError as e:
        return {"error": f"Unexpected API response format: {str(e)}", "weather": None}
    except Exception as e:
        return {"error": f"Weather API error: {str(e)}", "weather": None}

def format_weather_alert(weather_data):
    """Format weather data into human-readable alert"""
    if weather_data.get("error"):
        return f"Weather alert error: {weather_data['error']}"

    weather = weather_data["weather"]
    alert = f"Weather in {weather['city']}, {weather['country']}: "
    alert += f"{weather['description']}, "
    alert += f"Temperature: {weather['temperature']}°C "
    alert += f"(feels like {weather['feels_like']}°C), "
    alert += f"Humidity: {weather['humidity']}%, "
    alert += f"Wind: {weather['wind_speed']} m/s"

    return alert

Corresponding LOT Action:

DEFINE ACTION WeatherMonitor
ON TOPIC "weather/request" DO
    SET request_data GET TOPIC "weather/request" AS JSON
    CALL PYTHON "WeatherAPI.get_weather"
        WITH ({request_data.city}, {request_data.api_key})
        RETURN AS {weather_result}
    IF {weather_result.error} == None THEN
        CALL PYTHON "WeatherAPI.format_weather_alert"
            WITH ({weather_result})
            RETURN AS {alert}
        PUBLISH TOPIC "weather/data" WITH {weather_result.weather}
        PUBLISH TOPIC "weather/alert" WITH {alert}
    ELSE
        PUBLISH TOPIC "weather/error" WITH {weather_result.error}

Machine Learning Examples

Simple Prediction Model

# Script Name: MLPredictor
import math

def linear_regression_predict(x, slope, intercept):
    """Simple linear regression prediction"""
    return slope * x + intercept

def moving_average(data, window_size):
    """Calculate moving average of data"""
    if len(data) < window_size:
        return data

    result = []
    for i in range(len(data) - window_size + 1):
        window = data[i:i + window_size]
        avg = sum(window) / window_size
        result.append(avg)

    return result

def detect_anomaly(value, historical_data, threshold=2):
    """Detect if a value is an anomaly based on historical data"""
    if not historical_data:
        return {"is_anomaly": False, "reason": "No historical data"}

    mean = sum(historical_data) / len(historical_data)
    variance = sum((x - mean) ** 2 for x in historical_data) / len(historical_data)
    std_dev = math.sqrt(variance)

    z_score = abs(value - mean) / std_dev if std_dev > 0 else 0

    return {
        "is_anomaly": z_score > threshold,
        "z_score": z_score,
        "mean": mean,
        "std_dev": std_dev,
        "threshold": threshold
    }

def predict_trend(data, future_periods=5):
    """Predict future trend based on historical data"""
    if len(data) < 2:
        return {"error": "Need at least 2 data points"}

    # Simple linear trend calculation
    n = len(data)
    x_sum = sum(range(n))
    y_sum = sum(data)
    xy_sum = sum(i * data[i] for i in range(n))
    x2_sum = sum(i * i for i in range(n))

    slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum * x_sum)
    intercept = (y_sum - slope * x_sum) / n

    # Generate predictions
    predictions = []
    for i in range(n, n + future_periods):
        pred = linear_regression_predict(i, slope, intercept)
        predictions.append(pred)

    return {
        "slope": slope,
        "intercept": intercept,
        "predictions": predictions,
        "trend_direction": "increasing" if slope > 0 else "decreasing" if slope < 0 else "stable"
    }

Corresponding LOT Action:

DEFINE ACTION MLProcessor
ON TOPIC "ml/predict" DO
    SET ml_data GET TOPIC "ml/predict" AS JSON
    CALL PYTHON "MLPredictor.detect_anomaly"
        WITH ({ml_data.value}, {ml_data.historical_data}, {ml_data.threshold})
        RETURN AS {anomaly_result}
    CALL PYTHON "MLPredictor.predict_trend"
        WITH ({ml_data.historical_data}, 5)
        RETURN AS {trend_result}
    PUBLISH TOPIC "ml/anomaly" WITH {anomaly_result}
    PUBLISH TOPIC "ml/trend" WITH {trend_result}

Error Handling Patterns

Robust Error Handler

# Script Name: ErrorHandler
import traceback
import sys

def safe_execute(func, *args, **kwargs):
    """Safely execute a function with comprehensive error handling"""
    try:
        result = func(*args, **kwargs)
        return {
            "success": True,
            "result": result,
            "error": None,
            "error_type": None
        }
    except ValueError as e:
        return {
            "success": False,
            "result": None,
            "error": f"Value error: {str(e)}",
            "error_type": "ValueError"
        }
    except TypeError as e:
        return {
            "success": False,
            "result": None,
            "error": f"Type error: {str(e)}",
            "error_type": "TypeError"
        }
    except ZeroDivisionError as e:
        return {
            "success": False,
            "result": None,
            "error": f"Division by zero: {str(e)}",
            "error_type": "ZeroDivisionError"
        }
    except Exception as e:
        return {
            "success": False,
            "result": None,
            "error": f"Unexpected error: {str(e)}",
            "error_type": type(e).__name__
        }

def validate_input(data, expected_type, required_fields=None):
    """Validate input data structure and types"""
    try:
        if not isinstance(data, expected_type):
            return {
                "valid": False,
                "error": f"Expected {expected_type.__name__}, got {type(data).__name__}"
            }

        if required_fields and isinstance(data, dict):
            missing_fields = [field for field in required_fields if field not in data]
            if missing_fields:
                return {
                    "valid": False,
                    "error": f"Missing required fields: {missing_fields}"
                }

        return {"valid": True, "error": None}
    except Exception as e:
        return {
            "valid": False,
            "error": f"Validation error: {str(e)}"
        }

def retry_operation(func, max_retries=3, delay=1):
    """Retry an operation with exponential backoff"""
    import time

    for attempt in range(max_retries):
        try:
            result = func()
            return {"success": True, "result": result, "attempts": attempt + 1}
        except Exception as e:
            if attempt == max_retries - 1:
                return {
                    "success": False,
                    "error": f"Failed after {max_retries} attempts: {str(e)}",
                    "attempts": attempt + 1
                }
            time.sleep(delay * (2 ** attempt))  # Exponential backoff

    return {"success": False, "error": "Retry limit exceeded", "attempts": max_retries}

Complete Workflow Examples

IoT Data Pipeline

# Script Name: IoTDataPipeline
import json
from datetime import datetime

def process_sensor_pipeline(sensor_data):
    """Complete IoT sensor data processing pipeline"""
    pipeline_result = {
        "timestamp": datetime.now().isoformat(),
        "stages": [],
        "final_result": None,
        "errors": []
    }

    try:
        # Stage 1: Validation
        validation = validate_sensor_data(sensor_data)
        pipeline_result["stages"].append({"stage": "validation", "success": validation["valid"]})

        if not validation["valid"]:
            pipeline_result["errors"].append(f"Validation failed: {validation['error']}")
            return pipeline_result

        # Stage 2: Data cleaning
        cleaned_data = clean_sensor_data(sensor_data)
        pipeline_result["stages"].append({"stage": "cleaning", "success": True})

        # Stage 3: Analysis
        analysis = analyze_sensor_data(cleaned_data)
        pipeline_result["stages"].append({"stage": "analysis", "success": True})

        # Stage 4: Anomaly detection
        anomaly = detect_anomalies(cleaned_data, analysis)
        pipeline_result["stages"].append({"stage": "anomaly_detection", "success": True})

        # Stage 5: Generate insights
        insights = generate_insights(cleaned_data, analysis, anomaly)
        pipeline_result["stages"].append({"stage": "insights", "success": True})

        # Final result
        pipeline_result["final_result"] = {
            "cleaned_data": cleaned_data,
            "analysis": analysis,
            "anomaly": anomaly,
            "insights": insights
        }

    except Exception as e:
        pipeline_result["errors"].append(f"Pipeline error: {str(e)}")

    return pipeline_result

def validate_sensor_data(data):
    """Validate sensor data structure"""
    required_fields = ["sensor_id", "timestamp", "value", "unit"]

    if not isinstance(data, dict):
        return {"valid": False, "error": "Data must be a dictionary"}

    missing_fields = [field for field in required_fields if field not in data]
    if missing_fields:
        return {"valid": False, "error": f"Missing fields: {missing_fields}"}

    if not isinstance(data["value"], (int, float)):
        return {"valid": False, "error": "Value must be numeric"}

    return {"valid": True, "error": None}

def clean_sensor_data(data):
    """Clean and normalize sensor data"""
    cleaned = data.copy()

    # Normalize timestamp
    if isinstance(cleaned["timestamp"], str):
        # Assume ISO format, keep as is
        pass

    # Round value to 2 decimal places
    cleaned["value"] = round(float(cleaned["value"]), 2)

    # Normalize unit to lowercase
    cleaned["unit"] = cleaned["unit"].lower()

    return cleaned

def analyze_sensor_data(data):
    """Analyze sensor data for patterns and statistics"""
    value = data["value"]
    unit = data["unit"]

    # Basic analysis based on unit
    if unit in ["celsius", "fahrenheit"]:
        return analyze_temperature(value, unit)
    elif unit == "percent":
        return analyze_humidity(value)
    elif unit in ["pascal", "bar", "psi"]:
        return analyze_pressure(value, unit)
    else:
        return {"type": "generic", "value": value, "unit": unit}

def analyze_temperature(value, unit):
    """Analyze temperature data"""
    if unit == "fahrenheit":
        celsius = (value - 32) * 5/9
    else:
        celsius = value

    return {
        "type": "temperature",
        "celsius": celsius,
        "fahrenheit": celsius * 9/5 + 32,
        "comfort_level": get_temperature_comfort(celsius)
    }

def analyze_humidity(value):
    """Analyze humidity data"""
    return {
        "type": "humidity",
        "value": value,
        "comfort_level": get_humidity_comfort(value)
    }

def analyze_pressure(value, unit):
    """Analyze pressure data"""
    # Convert to Pascal for analysis
    if unit == "bar":
        pascal = value * 100000
    elif unit == "psi":
        pascal = value * 6894.76
    else:
        pascal = value

    return {
        "type": "pressure",
        "pascal": pascal,
        "bar": pascal / 100000,
        "psi": pascal / 6894.76
    }

def get_temperature_comfort(celsius):
    """Get comfort level for temperature"""
    if celsius < 0:
        return "freezing"
    elif celsius < 10:
        return "cold"
    elif celsius < 20:
        return "cool"
    elif celsius < 25:
        return "comfortable"
    elif celsius < 30:
        return "warm"
    else:
        return "hot"

def get_humidity_comfort(humidity):
    """Get comfort level for humidity"""
    if humidity < 30:
        return "low"
    elif humidity < 60:
        return "comfortable"
    else:
        return "high"

def detect_anomalies(data, analysis):
    """Detect anomalies in sensor data"""
    value = data["value"]
    unit = data["unit"]

    anomalies = []

    # Temperature anomalies
    if unit in ["celsius", "fahrenheit"]:
        celsius = analysis["celsius"]
        if celsius < -50 or celsius > 100:
            anomalies.append("Temperature out of normal range")

    # Humidity anomalies
    elif unit == "percent":
        if value < 0 or value > 100:
            anomalies.append("Humidity out of valid range")

    return {
        "has_anomalies": len(anomalies) > 0,
        "anomalies": anomalies
    }

def generate_insights(data, analysis, anomaly):
    """Generate actionable insights from sensor data"""
    insights = []

    # Temperature insights
    if analysis["type"] == "temperature":
        comfort = analysis["comfort_level"]
        if comfort in ["cold", "freezing"]:
            insights.append("Consider heating or insulation")
        elif comfort in ["warm", "hot"]:
            insights.append("Consider cooling or ventilation")

    # Humidity insights
    elif analysis["type"] == "humidity":
        comfort = analysis["comfort_level"]
        if comfort == "low":
            insights.append("Consider using a humidifier")
        elif comfort == "high":
            insights.append("Consider using a dehumidifier")

    # Anomaly insights
    if anomaly["has_anomalies"]:
        insights.append("Sensor may need calibration or maintenance")

    return {
        "insights": insights,
        "recommendations": insights,
        "priority": "high" if anomaly["has_anomalies"] else "normal"
    }

Corresponding LOT Action:

DEFINE ACTION IoTDataPipeline
ON TOPIC "sensors/raw" DO
    SET raw_data GET TOPIC "sensors/raw" AS JSON
    CALL PYTHON "IoTDataPipeline.process_sensor_pipeline"
        WITH ({raw_data})
        RETURN AS {pipeline_result}

    IF {pipeline_result.final_result} != None THEN
        PUBLISH TOPIC "sensors/processed" WITH {pipeline_result.final_result.cleaned_data}
        PUBLISH TOPIC "sensors/analysis" WITH {pipeline_result.final_result.analysis}
        PUBLISH TOPIC "sensors/insights" WITH {pipeline_result.final_result.insights}

        IF {pipeline_result.final_result.anomaly.has_anomalies} == True THEN
            PUBLISH TOPIC "sensors/alerts" WITH {pipeline_result.final_result.anomaly}
        END
    ELSE
        PUBLISH TOPIC "sensors/errors" WITH {pipeline_result.errors}
    END

Best Practices Summary

  1. Always include Script Name: Every Python script must start with # Script Name: YourScriptName
  2. Handle errors gracefully: Use try-catch blocks and return structured error information
  3. Validate inputs: Check data types and required fields before processing
  4. Use descriptive function names: Make your code self-documenting
  5. Return structured data: Use dictionaries for complex return values
  6. Keep functions focused: Each function should have a single responsibility
  7. Test incrementally: Test each function individually before integrating with LOT actions
  8. Document your functions: Use docstrings to explain what each function does
  9. Consider performance: Keep functions efficient for real-time IoT applications
  10. Use appropriate data types: Ensure return values are JSON-serializable

These examples demonstrate the power and flexibility of Python integration with Coreflux, enabling you to build sophisticated IoT data processing pipelines directly within the MQTT broker environment.