streams
A Stream is the core concept in the Streams Engine API. It represents a complete data processing pipeline implemented as a Directed Acyclic Graph (DAG).
what is a stream?
A Stream is a collection of processing nodes connected by pipes that define how data flows through the system. It provides a way to:
- Define data sources and how they generate data
- Specify processing steps to transform the data
- Configure data sinks that consume or store the processed data
- Control the execution of the entire pipeline
stream components
nodes
Nodes are the building blocks of a Stream. Each node performs a specific processing function, such as:
- Source nodes: Generate data (e.g., reading from a file, generating random values)
- Processing nodes: Transform data (e.g., filtering, mapping, aggregating)
- Sink nodes: Consume data (e.g., writing to a file, displaying on screen)
Learn more about nodes in the Nodes concept guide.
pipes
Pipes are connections between nodes that define the flow of data. They establish the relationships and dependencies between processing steps.
Learn more about pipes in the Pipes concept guide.
stream lifecycle
A Stream goes through several states during its lifecycle:
- Created: The Stream is defined but not yet running
- Initialized: The Stream is preparing for execution
- Running: The Stream is actively processing data
- Paused: The Stream has temporarily suspended processing
- Stopped: The Stream has ceased processing but can be restarted
- Killed: The Stream has been terminated and cannot be restarted
stream representation
Streams are typically defined using JSON or YAML. Here's an example of a Stream definition in JSON:
{
"nodes": [
{
"id": "source",
"executable": "file-reader",
"config": {
"filename": "input.csv",
"format": "csv"
}
},
{
"id": "processor",
"executable": "data-transformer",
"config": {
"operations": [
{
"type": "multiply",
"field": "value",
"factor": 2
}
]
}
},
{
"id": "sink",
"executable": "file-writer",
"config": {
"filename": "output.csv",
"format": "csv"
}
}
],
"pipes": [
{
"id": "pipe1",
"from": "source",
"to": "processor"
},
{
"id": "pipe2",
"from": "processor",
"to": "sink"
}
]
}
The same Stream in YAML format:
nodes:
- id: source
executable: file-reader
config:
filename: input.csv
format: csv
- id: processor
executable: data-transformer
config:
operations:
- type: multiply
field: value
factor: 2
- id: sink
executable: file-writer
config:
filename: output.csv
format: csv
pipes:
- id: pipe1
from: source
to: processor
- id: pipe2
from: processor
to: sink
stream creation and control
creating a stream
To create a Stream, you define the nodes and pipes in a request to the Streams Engine API:
curl -X POST http://localhost:42069/streams \
-H "Content-Type: application/json" \
-d '{
"nodes": [...],
"pipes": [...]
}'
controlling a stream
Once created, you can control a Stream using signals:
# Start a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "start"
}'
# Pause a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "pause"
}'
# Stop a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "stop"
}'
modifying a stream
Streams can be modified dynamically, even while running:
# Add nodes to a stream
curl -X POST http://localhost:42069/streams/{stream-id}/nodes \
-H "Content-Type: application/json" \
-d '[...]'
# Add pipes to a stream
curl -X POST http://localhost:42069/streams/{stream-id}/pipes \
-H "Content-Type: application/json" \
-d '[...]'
# Remove nodes from a stream
curl -X DELETE http://localhost:42069/streams/{stream-id}/nodes \
-H "Content-Type: application/json" \
-d '[...]'
# Remove pipes from a stream
curl -X DELETE http://localhost:42069/streams/{stream-id}/pipes \
-H "Content-Type: application/json" \
-d '[...]'
stream properties
Streams have several important properties:
Identifier
Each Stream has a unique identifier, either specified during creation or automatically generated by the system.
Metadata
Streams can have associated metadata to describe their purpose, owner, creation time, etc.
Execution Status
The current state of the Stream (created, running, paused, etc.).
Performance Metrics
Streams maintain metrics on throughput, processing time, etc.
Use Cases
Streams are versatile and can be used for various data processing scenarios:
- Data Transformation: Convert data between formats
- Data Filtering: Remove unwanted data points
- Data Aggregation: Combine multiple data points
- Real-time Processing: Process data as it arrives
- Data Visualization: Transform data for display
- Machine Learning: Preprocess data for algorithms
Best Practices
When working with Streams:
- Keep Streams focused: Design each Stream with a clear purpose
- Use descriptive IDs: Make node and pipe IDs descriptive of their function
- Validate Stream structure: Ensure the DAG is valid before execution
- Monitor Stream performance: Check throughput and resource usage
- Implement error handling: Configure how to handle processing failures
Next Steps
- Learn about Nodes
- Explore Pipes
- Understand DAGs
- Follow our Basic Pipeline Tutorial