Skip to main content

Building Complex Workflows

This tutorial builds on the Basic Pipeline guide and demonstrates how to create more sophisticated data processing workflows with the Streams Engine.

Prerequisites

Before proceeding, make sure you:

  • Have completed the Basic Pipeline tutorial
  • Understand the core concepts of Streams, Nodes, and Pipes
  • Have access to the Streams Engine API (running on localhost:42069 for this tutorial)

Complex Workflow Patterns

This tutorial will implement several advanced patterns:

  1. Branching workflows - splitting data into multiple processing paths
  2. Merging data streams - combining results from different paths
  3. Conditional routing - directing data based on content
  4. Feedback loops - routing certain data back for reprocessing
  5. Error handling - managing failures gracefully

Scenario: Data Analysis Pipeline

Our example will build a data analysis pipeline for sensor readings that:

  1. Reads sensor data from a CSV file
  2. Preprocesses the data (normalization, outlier removal)
  3. Splits the data into parallel analysis paths
  4. Performs different analyses (statistics, anomaly detection)
  5. Merges the analysis results
  6. Outputs results to multiple destinations (database, file, visualization)

Pipeline Overview

                                                 ┌─────────────────┐
┌─>│Statistical Analysis│─┐
│ └─────────────────┘ │
┌─────────┐ ┌────────────┐ ┌─────────┐ │ │ ┌─────────┐ ┌───────────┐
│CSV Reader│─>│Preprocessor│─>│Data Router│─┤ ├─>│Data Merger│─>│Result Writer│
└─────────┘ └────────────┘ └─────────┘ │ │ └─────────┘ └───────────┘
│ ┌─────────────────┐ │
└─>│Anomaly Detection│──┘
└─────────────────┘

Step 1: Create the Base Stream

Let's start by creating the main components of our stream:

curl -X POST http://localhost:42069/streams \
-H "Content-Type: application/json" \
-d '{
"nodes": [
{
"id": "csv-reader",
"executable": "file-reader",
"config": {
"filename": "sensor_data.csv",
"format": "csv",
"delimiter": ",",
"skipHeader": true
}
},
{
"id": "preprocessor",
"executable": "data-preprocessor",
"config": {
"normalize": true,
"removeOutliers": true,
"outlierThreshold": 3.0
}
},
{
"id": "data-router",
"executable": "data-router",
"config": {
"defaultOutput": "default",
"routes": [
{
"condition": "reading_type == 'temperature'",
"output": "temperature"
},
{
"condition": "reading_type == 'pressure'",
"output": "pressure"
}
]
}
},
{
"id": "stats-analyzer",
"executable": "statistics-calculator",
"config": {
"statistics": ["mean", "median", "stddev", "max", "min"],
"windowSize": 100
}
},
{
"id": "anomaly-detector",
"executable": "anomaly-detector",
"config": {
"algorithm": "isolation-forest",
"sensitivityThreshold": 0.8
}
},
{
"id": "data-merger",
"executable": "data-merger",
"config": {
"mergeStrategy": "combine",
"waitForAllInputs": true
}
},
{
"id": "result-writer",
"executable": "multi-writer",
"config": {
"outputs": [
{
"type": "file",
"filename": "analysis_results.json",
"format": "json"
},
{
"type": "database",
"connection": "sqlite:results.db",
"table": "analysis_results"
}
]
}
}
],
"pipes": [
{
"id": "reader-to-preprocessor",
"source": "csv-reader",
"target": "preprocessor"
},
{
"id": "preprocessor-to-router",
"source": "preprocessor",
"target": "data-router"
},
{
"id": "router-to-stats",
"source": "data-router",
"target": "stats-analyzer",
"sourcePort": "default"
},
{
"id": "router-to-anomaly",
"source": "data-router",
"target": "anomaly-detector",
"sourcePort": "default"
},
{
"id": "stats-to-merger",
"source": "stats-analyzer",
"target": "data-merger",
"targetPort": "stats"
},
{
"id": "anomaly-to-merger",
"source": "anomaly-detector",
"target": "data-merger",
"targetPort": "anomalies"
},
{
"id": "merger-to-writer",
"source": "data-merger",
"target": "result-writer"
}
]
}'

Step 2: Add Specialized Analysis Nodes

Now let's add specialized nodes for temperature and pressure analysis:

curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "temperature-analyzer",
"executable": "temperature-analyzer",
"config": {
"unitConversion": "celsius",
"thresholds": {
"warning": 30,
"critical": 40
}
}
},
{
"id": "pressure-analyzer",
"executable": "pressure-analyzer",
"config": {
"unitConversion": "kPa",
"thresholds": {
"warning": 110,
"critical": 120
}
}
}
]'

Step 3: Connect Specialized Nodes

Connect these specialized nodes to the router:

curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "router-to-temperature",
"source": "data-router",
"target": "temperature-analyzer",
"sourcePort": "temperature"
},
{
"id": "router-to-pressure",
"source": "data-router",
"target": "pressure-analyzer",
"sourcePort": "pressure"
},
{
"id": "temperature-to-merger",
"source": "temperature-analyzer",
"target": "data-merger",
"targetPort": "temperature"
},
{
"id": "pressure-to-merger",
"source": "pressure-analyzer",
"target": "data-merger",
"targetPort": "pressure"
}
]'

Step 4: Add Error Handling

Let's add error handling to our pipeline:

curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "error-handler",
"executable": "error-handler",
"config": {
"retryCount": 3,
"logErrors": true,
"errorDatabase": "sqlite:errors.db"
}
}
]'

Connect the error handler:

curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "preprocessor-errors",
"source": "preprocessor",
"target": "error-handler",
"sourcePort": "errors"
},
{
"id": "router-errors",
"source": "data-router",
"target": "error-handler",
"sourcePort": "errors"
},
{
"id": "stats-errors",
"source": "stats-analyzer",
"target": "error-handler",
"sourcePort": "errors"
},
{
"id": "anomaly-errors",
"source": "anomaly-detector",
"target": "error-handler",
"sourcePort": "errors"
}
]'

Step 5: Add a Feedback Loop

For certain analysis results that need reprocessing, let's add a feedback loop:

curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "reprocessing-filter",
"executable": "data-filter",
"config": {
"condition": "confidence_score < 0.6"
}
}
]'

Connect the feedback loop:

curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "anomaly-to-filter",
"source": "anomaly-detector",
"target": "reprocessing-filter"
},
{
"id": "filter-to-preprocessor",
"source": "reprocessing-filter",
"target": "preprocessor",
"config": {
"maxIterations": 3
}
}
]'

Step 6: Add Real-time Visualization

Let's add a visualization node to see results in real-time:

curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "visualizer",
"executable": "data-visualizer",
"config": {
"port": 8080,
"refreshRate": 1000,
"charts": [
{
"type": "line",
"title": "Temperature Trends",
"xField": "timestamp",
"yField": "temperature"
},
{
"type": "scatter",
"title": "Anomaly Detection",
"xField": "value",
"yField": "anomaly_score",
"colorField": "is_anomaly"
}
]
}
}
]'

Connect the visualizer:

curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "merger-to-visualizer",
"source": "data-merger",
"target": "visualizer"
}
]'

Step 7: Start and Monitor the Complex Stream

Start the complex stream:

curl -X POST http://localhost:42069/streams/stream-123/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "start"
}'

Monitor the status of all nodes:

curl -X GET http://localhost:42069/streams/stream-123/status

Advanced Techniques

Dynamic Node Configuration

You can update node configurations during execution:

curl -X PATCH http://localhost:42069/streams/stream-123/nodes/anomaly-detector \
-H "Content-Type: application/json" \
-d '{
"config": {
"sensitivityThreshold": 0.7,
"adaptiveMode": true
}
}'

Conditional Execution

To enable or disable parts of the pipeline based on conditions:

curl -X POST http://localhost:42069/streams/stream-123/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "pause",
"targets": ["temperature-analyzer", "pressure-analyzer"]
}'

Stream Variables

Define variables that can be accessed by multiple nodes:

curl -X POST http://localhost:42069/streams/stream-123/variables \
-H "Content-Type: application/json" \
-d '{
"variables": {
"batchSize": 100,
"thresholdMultiplier": 1.5,
"debugMode": false
}
}'

Best Practices for Complex Workflows

  1. Modular Design: Break complex processing into smaller, focused nodes
  2. Clear Naming: Use descriptive IDs for nodes and pipes
  3. Error Handling: Add dedicated error handling for all critical nodes
  4. Monitoring: Add logging and visualization nodes for observability
  5. Testing: Test complex workflows with sample data before production use
  6. Documentation: Document the purpose and configuration of each node
  7. Versioning: Version your stream definitions for easier management

Debugging Complex Streams

For debugging complex workflows:

  1. Use the data-logger node at key points in the pipeline:
curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "debug-logger",
"executable": "data-logger",
"config": {
"logLevel": "debug",
"sampleRate": 0.1,
"logFile": "debug.log"
}
}
]'
  1. Add a pipe to the logger:
curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "debug-pipe",
"source": "preprocessor",
"target": "debug-logger"
}
]'

Conclusion

You've now created a sophisticated data processing pipeline with:

  • Parallel processing paths
  • Specialized analysis nodes
  • Error handling
  • Feedback loops
  • Real-time visualization
  • Multiple output destinations

This complex workflow demonstrates the power and flexibility of the Streams Engine for building advanced data processing applications.

Next Steps

  • Explore YAML Configuration for defining complex streams more easily
  • Learn about DAGs to understand advanced graph structures
  • See the API Reference for all available operations