Skip to content

High Performance Python Executor Guide

This guide demonstrates how to achieve maximum performance with the Python executor node for real-time applications.

Performance Enhancements Added

1. Enhanced Debug and Monitoring

  • Execution Time Tracking: Each execution is timed and displayed in the node status
  • Performance Statistics: Min, max, average execution times
  • Throughput Monitoring: Messages per second calculation
  • Queue Length Tracking: Real-time queue size monitoring
  • Error Rate Tracking: Track failed executions

2. Status Display Improvements

  • Real-time Metrics: 5ms (avg: 7ms) #1234 format
  • Queue Status: Shows queued (5) when messages are waiting
  • Error Indicators: Red ring for errors with details

3. Debug Logging Features

  • Execution Logs: Detailed per-execution timing
  • Performance Reports: Periodic statistics (every 30 seconds)
  • Final Summary: Complete performance report on node shutdown

Performance Optimization Strategies

1. Use Persistent Mode

javascript
// Node Configuration
mode: "persistent"
debug: true

Benefits:

  • ~5-10ms execution time vs ~50-100ms for single mode
  • Shared Python interpreter state
  • No process startup overhead

2. Optimize Python Code

Fast Math Operations

python
import math

# Use built-in functions (faster than loops)
result = sum(data)
average = sum(data) / len(data)

# Use math module for complex operations
sqrt_val = math.sqrt(value)
log_val = math.log10(value + 1)

Efficient Data Structures

python
# Use list comprehensions (faster than loops)
processed = [x * 2 for x in data]

# Use built-in functions
filtered = [x for x in data if x > threshold]

# Avoid repeated calculations
temp_factor = 0.5
humidity_factor = 0.3
comfort_index = (temp * temp_factor) + (humidity * humidity_factor)

Minimal Imports

python
# Import only what you need
import math  # Fast
from datetime import datetime  # Slower but often necessary

# Avoid heavy imports in loops
# import numpy as np  # Only if needed

3. Message Design

Efficient Message Structure

python
# Good - Flat structure
msg['payload'] = {
    'value': 42,
    'status': 'ok',
    'timestamp': time.time()
}

# Avoid - Deep nesting
msg['payload'] = {
    'data': {
        'sensor': {
            'readings': {
                'temperature': 25
            }
        }
    }
}

Return Only What's Needed

python
# Good - Return only processed data
msg['payload'] = {
    'processed_value': result,
    'status': status
}

# Avoid - Returning large unchanged data
msg['payload'] = {
    'original_data': large_array,  # Unnecessary
    'processed_value': result
}

Real-Time Performance Testing

Test 1: High-Frequency Processing (100 Hz)

Frequency: 100 messages/second
Expected Performance: 5-10ms average
Use Case: Real-time sensor data processing

Test 2: Burst Processing (100 messages at once)

Burst Size: 100 messages
Expected Performance: <50ms total processing time
Use Case: Batch data processing

Test 3: Stress Test (1000 messages)

Stress Load: 1000 messages with varying complexity
Expected Performance: >500 messages/second throughput
Use Case: High-load production scenarios

Performance Benchmarks

Typical Performance Metrics

ScenarioModeAvg TimeThroughputMemory Usage
Simple MathPersistent3-5ms200-300/sLow
Data ProcessingPersistent5-10ms100-200/sMedium
Complex AnalysisPersistent10-20ms50-100/sHigh
Image ProcessingPersistent20-50ms20-50/sVery High

Comparison: Single vs Persistent Mode

MetricSingle ModePersistent ModeImprovement
Startup Time50-100ms0ms (after init)10-20x faster
Memory UsageLowerHigherTrade-off
IsolationCompleteProcess-levelLess isolated
ReliabilityHighMediumAuto-restart

Real-Time Application Examples

1. IoT Sensor Processing

python
# Fast sensor data processing
import math

# Extract sensor data
temp = msg['payload']['temperature']
humidity = msg['payload']['humidity']
pressure = msg['payload']['pressure']

# Quick calculations
comfort_index = (temp * 0.5) + (humidity * 0.3)
heat_index = temp + (humidity * 0.1)

# Anomaly detection
status = 'normal'
if temp > 30 or humidity > 80:
    status = 'alert'

# Return processed data
msg['payload'] = {
    'comfort_index': round(comfort_index, 2),
    'heat_index': round(heat_index, 2),
    'status': status
}

return msg

2. Financial Data Processing

python
# High-frequency trading data processing
import math

# Extract price data
price = msg['payload']['price']
volume = msg['payload']['volume']
timestamp = msg['payload']['timestamp']

# Get historical data (stored in context)
history = msg.get('_history', [])
history.append(price)
if len(history) > 20:
    history = history[-20:]  # Keep last 20 values

# Calculate indicators
moving_avg = sum(history) / len(history)
volatility = math.sqrt(sum((p - moving_avg)**2 for p in history) / len(history))

# Trading signal
signal = 'hold'
if price > moving_avg * 1.02:
    signal = 'buy'
elif price < moving_avg * 0.98:
    signal = 'sell'

# Return analysis
msg['payload'] = {
    'price': price,
    'moving_avg': round(moving_avg, 2),
    'volatility': round(volatility, 2),
    'signal': signal,
    '_history': history
}

return msg

3. Real-Time Image Processing

python
# Fast image processing
import cv2
import numpy as np

# Get image data
image_data = msg['payload']

# Convert to OpenCV format
nparr = np.frombuffer(image_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

# Fast processing - resize for speed
small_img = cv2.resize(img, (320, 240))

# Simple edge detection
gray = cv2.cvtColor(small_img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150)

# Count edges (simple analysis)
edge_count = np.sum(edges > 0)
edge_density = edge_count / (320 * 240)

# Encode result
_, buffer = cv2.imencode('.jpg', edges)

# Return processed image and analysis
msg['payload'] = {
    'image': buffer.tobytes(),
    'analysis': {
        'edge_count': int(edge_count),
        'edge_density': round(edge_density, 4),
        'processed_size': [320, 240]
    }
}

return msg

Performance Monitoring

1. Enable Debug Mode

javascript
// In node configuration
debug: true

2. Monitor Node Status

  • Green dot: Normal execution with timing
  • Blue ring: Messages queued
  • Red ring: Execution errors

3. Check Node-RED Logs

[info] [python-executor:12345] Execution #1234: 5ms, Queue: 0, Avg: 7ms
[info] [python-executor:12345] Performance Stats - Executions: 1000, Avg: 7ms, Min: 3ms, Max: 15ms, Throughput: 142.86/s, Uptime: 7s, Errors: 0

4. Performance Data in Messages

javascript
msg._performance = {
    executionTime: 5,
    totalExecutions: 1234,
    avgTime: 7,
    queueLength: 0,
    mode: "persistent"
}

Troubleshooting Performance Issues

Common Performance Problems

  1. High Execution Times (>50ms)

    • Check for heavy imports
    • Optimize Python code
    • Reduce data processing complexity
  2. Growing Queue Lengths

    • Increase timeout values
    • Optimize Python code
    • Consider multiple parallel nodes
  3. Memory Usage Growth

    • Avoid storing large data in variables
    • Clean up temporary variables
    • Restart periodic process

Performance Optimization Checklist

  • [ ] Use persistent mode for high-frequency processing
  • [ ] Enable debug logging for monitoring
  • [ ] Optimize Python code for speed
  • [ ] Minimize data transfer
  • [ ] Use appropriate timeout values
  • [ ] Monitor queue lengths
  • [ ] Test with realistic data volumes
  • [ ] Implement error handling
  • [ ] Consider parallel processing for high loads

Best Practices for Real-Time Applications

  1. Design for Throughput: Process data in batches when possible
  2. Monitor Performance: Use debug mode and performance metrics
  3. Graceful Degradation: Handle high loads with queue management
  4. Resource Management: Monitor memory and CPU usage
  5. Error Recovery: Implement proper error handling and recovery
  6. Testing: Use the provided performance test flows

Using the Demo Flow

  1. Import the Flow: Import examples/high-performance-demo.json
  2. Start High-Frequency Test: Click "High Frequency Test" (100 Hz)
  3. Monitor Performance: Watch debug output and node status
  4. Run Burst Test: Click "Burst Test (100 msgs)" for batch processing
  5. Stress Test: Click "Stress Test (1000 msgs)" for maximum load
  6. Analyze Results: Check debug output for performance metrics

The demo flow provides comprehensive performance testing and monitoring capabilities to help you optimize your real-time Python processing applications.