Using YAML Configuration
This tutorial demonstrates how to define and manage streams using YAML configuration instead of JSON. YAML offers a more human-readable format with support for comments, making complex stream definitions easier to create and maintain.
Prerequisites
Before proceeding, make sure you:
- Have completed the Basic Pipeline tutorial
- Understand the core concepts of Streams
- Have access to the Streams Engine API (running on
localhost:42069
for this tutorial) - Basic familiarity with YAML syntax
Why Use YAML?
YAML offers several advantages over JSON for stream configuration:
- Improved readability with less punctuation and structural noise
- Support for comments to document your configuration
- Multi-line strings for complex expressions and configurations
- Anchors and aliases for reusing configuration blocks
- More forgiving syntax with less required quoting and braces
YAML Basics for Streams Engine
Simple Stream in YAML
Here's a basic stream definition in YAML:
nodes:
- id: source
executable: file-reader
config:
filename: input.csv
format: csv
- id: processor
executable: data-transformer
config:
operations:
- type: multiply
field: value
factor: 2
- id: sink
executable: file-writer
config:
filename: output.csv
format: csv
pipes:
- id: pipe1
source: source
target: processor
- id: pipe2
source: processor
target: sink
The same configuration in JSON would be:
{
"nodes": [
{
"id": "source",
"executable": "file-reader",
"config": {
"filename": "input.csv",
"format": "csv"
}
},
{
"id": "processor",
"executable": "data-transformer",
"config": {
"operations": [
{
"type": "multiply",
"field": "value",
"factor": 2
}
]
}
},
{
"id": "sink",
"executable": "file-writer",
"config": {
"filename": "output.csv",
"format": "csv"
}
}
],
"pipes": [
{
"id": "pipe1",
"source": "source",
"target": "processor"
},
{
"id": "pipe2",
"source": "processor",
"target": "sink"
}
]
}
Step 1: Creating a Stream with YAML
Let's create a simple data processing pipeline using YAML:
- Create a file named
stream.yaml
with the following content:
# Data processing pipeline
# Created: 2023-04-01
# Define all processing nodes
nodes:
# Source node - reads from a CSV file
- id: csv-reader
executable: file-reader
config:
filename: input.csv
format: csv
delimiter: ','
skipHeader: true
# Processing node - transforms the data
- id: transformer
executable: data-transformer
config:
operations:
- type: multiply
field: value
factor: 2
- type: add
field: value
constant: 10
# Filter node - removes unwanted data
- id: filter
executable: data-filter
config:
condition: value > 50
# Output node - writes to a new CSV file
- id: csv-writer
executable: file-writer
config:
filename: output.csv
format: csv
writeHeader: true
# Define connections between nodes
pipes:
- id: reader-to-transformer
source: csv-reader
target: transformer
- id: transformer-to-filter
source: transformer
target: filter
- id: filter-to-writer
source: filter
target: csv-writer
- Create the stream using the YAML API endpoint:
curl -X POST http://localhost:42069/streams/yaml \
-H "Content-Type: text/yaml" \
--data-binary @stream.yaml
The API will respond with a JSON representation of the created stream, including an automatically generated stream ID.
Step 2: Using YAML Advanced Features
Let's explore some advanced YAML features that make complex configurations more manageable:
Using Anchors and Aliases
YAML anchors (&
) and aliases (*
) allow you to define a value once and reuse it:
# Define common configurations
common: &common_config
logErrors: true
timeoutMs: 5000
nodes:
- id: source
executable: file-reader
config:
<<: *common_config # Include the common configuration
filename: input.csv
format: csv
- id: processor
executable: data-transformer
config:
<<: *common_config # Include the common configuration
operations:
- type: multiply
field: value
factor: 2
Multi-line Expressions
For complex expressions, YAML's multi-line capabilities are helpful:
nodes:
- id: complex-filter
executable: data-filter
config:
condition: >
(value > 50 && category == 'A') ||
(value > 100 && category == 'B') ||
(value > 150 && category == 'C')
Environment Variables
You can use environment variables in your YAML (supported by the Streams Engine):
nodes:
- id: database-writer
executable: db-writer
config:
connectionString: ${DB_CONNECTION_STRING}
table: data_results
batchSize: ${BATCH_SIZE:-100} # Default to 100 if not set
Step 3: Create a Complex Stream with YAML
Let's define a more complex stream that demonstrates YAML's advantages:
# Advanced data processing pipeline
# This pipeline demonstrates various YAML features
# for complex stream definitions
# Define common configurations
defaults: &defaults
logErrors: true
timeoutMs: 5000
thresholds: &thresholds
warning: 50
error: 75
critical: 90
# Main stream configuration
nodes:
# Source node
- id: sensor-reader
executable: api-connector
config:
<<: *defaults
url: https://api.example.com/sensors
authToken: ${API_TOKEN}
pollInterval: 5000
format: json
# Data enrichment
- id: enricher
executable: data-enricher
config:
<<: *defaults
sources:
- id: metadata
url: https://api.example.com/metadata
cacheTime: 3600000
- id: reference
url: https://api.example.com/reference
cacheTime: 86400000
joinField: sensorId
# Parallel processing branches
- id: router
executable: data-router
config:
<<: *defaults
defaultOutput: default
routes:
- condition: type == 'temperature'
output: temperature
- condition: type == 'humidity'
output: humidity
- condition: type == 'pressure'
output: pressure
# Specialized analyzers
- id: temp-analyzer
executable: temperature-analyzer
config:
<<: *defaults
thresholds:
<<: *thresholds
warning: 30 # Override specific threshold
unitConversion: celsius
- id: humidity-analyzer
executable: humidity-analyzer
config:
<<: *defaults
thresholds: *thresholds
- id: pressure-analyzer
executable: pressure-analyzer
config:
<<: *defaults
thresholds: *thresholds
unitConversion: kPa
# Results aggregator
- id: aggregator
executable: data-aggregator
config:
<<: *defaults
aggregations:
- type: avg
field: value
window: 60000
- type: max
field: value
window: 60000
- type: min
field: value
window: 60000
# Multiple outputs
- id: db-writer
executable: database-writer
config:
<<: *defaults
connection: ${DB_CONNECTION}
table: sensor_data
batchSize: 100
- id: alert-generator
executable: alert-generator
config:
<<: *defaults
conditions:
- name: High Temperature
condition: type == 'temperature' && value > thresholds.warning
severity: warning
- name: Critical Temperature
condition: type == 'temperature' && value > thresholds.critical
severity: critical
notificationChannels:
- type: email
recipients: ${ALERT_EMAILS}
- type: webhook
url: ${WEBHOOK_URL}
# Define all connections
pipes:
# Main flow
- id: source-to-enricher
source: sensor-reader
target: enricher
- id: enricher-to-router
source: enricher
target: router
# Specialized analysis branches
- id: router-to-temp
source: router
target: temp-analyzer
sourcePort: temperature
- id: router-to-humidity
source: router
target: humidity-analyzer
sourcePort: humidity
- id: router-to-pressure
source: router
target: pressure-analyzer
sourcePort: pressure
# Merge results
- id: temp-to-aggregator
source: temp-analyzer
target: aggregator
- id: humidity-to-aggregator
source: humidity-analyzer
target: aggregator
- id: pressure-to-aggregator
source: pressure-analyzer
target: aggregator
# Outputs
- id: aggregator-to-db
source: aggregator
target: db-writer
- id: aggregator-to-alerts
source: aggregator
target: alert-generator
Save this configuration to complex-stream.yaml
and create the stream:
curl -X POST http://localhost:42069/streams/yaml \
-H "Content-Type: text/yaml" \
--data-binary @complex-stream.yaml
Step 4: Adding Nodes and Pipes with YAML
You can also add nodes and pipes to existing streams using YAML:
Adding Nodes
curl -X POST http://localhost:42069/streams/stream-123/nodes/yaml \
-H "Content-Type: text/yaml" \
--data-binary @- << EOF
# New visualization node
- id: visualizer
executable: data-visualizer
config:
port: 8080
refreshRate: 1000
charts:
- type: line
title: Temperature Over Time
xField: timestamp
yField: value
filter: type == 'temperature'
- type: gauge
title: Current Pressure
valueField: value
filter: type == 'pressure'
EOF
Adding Pipes
curl -X POST http://localhost:42069/streams/stream-123/pipes/yaml \
-H "Content-Type: text/yaml" \
--data-binary @- << EOF
# Connect aggregator to visualizer
- id: aggregator-to-visualizer
source: aggregator
target: visualizer
config:
bufferSize: 100
EOF
Step 5: Exporting a Stream as YAML
You can export an existing stream as YAML for editing or backup:
curl -X GET http://localhost:42069/streams/stream-123/yaml \
-H "Accept: text/yaml" \
--output stream-export.yaml
This is particularly useful for:
- Cloning and modifying streams
- Version-controlling your stream definitions
- Sharing configurations between environments
Best Practices for YAML Stream Configuration
- Use Comments: Add descriptive comments to explain complex parts of your configuration
- Organize Logically: Group related nodes together
- Use Anchors for Common Configs: Define reusable configuration blocks with anchors
- Validate Before Submitting: Use a YAML validator to check syntax
- Environment Variables: Use environment variables for credentials and environment-specific settings
- Version Control: Store YAML configurations in a version control system
- Consistent Formatting: Use consistent indentation (2 spaces is common)
- Descriptive IDs: Use clear, descriptive IDs for nodes and pipes
Converting Between YAML and JSON
If you have existing JSON configurations, you can convert them to YAML:
# Using Python
python -c 'import sys, yaml, json; print(yaml.dump(json.load(sys.stdin), default_flow_style=False))' < stream.json > stream.yaml
# Using online converters
# Many online tools are available for converting between formats
Troubleshooting YAML Configuration
Common YAML syntax issues to watch for:
- Indentation: YAML is sensitive to indentation, which must be consistent
- Quotes for Special Characters: Use quotes for strings containing special characters
- Proper List Formatting: Ensure consistent list item formatting with hyphens
- Escaping Characters: Properly escape special characters in strings
- Anchor Names: Ensure anchor names are unique
Next Steps
- Explore Complex Workflows to build advanced processing pipelines
- Learn about YAML Import API for all available YAML operations
- Understand DAGs for designing advanced graph structures