Skip to main content

streams

A Stream is the core concept in the Streams Engine API. It represents a complete data processing pipeline implemented as a Directed Acyclic Graph (DAG).

what is a stream?

A Stream is a collection of processing nodes connected by pipes that define how data flows through the system. It provides a way to:

  • Define data sources and how they generate data
  • Specify processing steps to transform the data
  • Configure data sinks that consume or store the processed data
  • Control the execution of the entire pipeline

stream components

nodes

Nodes are the building blocks of a Stream. Each node performs a specific processing function, such as:

  • Source nodes: Generate data (e.g., reading from a file, generating random values)
  • Processing nodes: Transform data (e.g., filtering, mapping, aggregating)
  • Sink nodes: Consume data (e.g., writing to a file, displaying on screen)

Learn more about nodes in the Nodes concept guide.

pipes

Pipes are connections between nodes that define the flow of data. They establish the relationships and dependencies between processing steps.

Learn more about pipes in the Pipes concept guide.

stream lifecycle

A Stream goes through several states during its lifecycle:

  1. Created: The Stream is defined but not yet running
  2. Initialized: The Stream is preparing for execution
  3. Running: The Stream is actively processing data
  4. Paused: The Stream has temporarily suspended processing
  5. Stopped: The Stream has ceased processing but can be restarted
  6. Killed: The Stream has been terminated and cannot be restarted

stream representation

Streams are typically defined using JSON or YAML. Here's an example of a Stream definition in JSON:

{
"nodes": [
{
"id": "source",
"executable": "file-reader",
"config": {
"filename": "input.csv",
"format": "csv"
}
},
{
"id": "processor",
"executable": "data-transformer",
"config": {
"operations": [
{
"type": "multiply",
"field": "value",
"factor": 2
}
]
}
},
{
"id": "sink",
"executable": "file-writer",
"config": {
"filename": "output.csv",
"format": "csv"
}
}
],
"pipes": [
{
"id": "pipe1",
"from": "source",
"to": "processor"
},
{
"id": "pipe2",
"from": "processor",
"to": "sink"
}
]
}

The same Stream in YAML format:

nodes:
- id: source
executable: file-reader
config:
filename: input.csv
format: csv
- id: processor
executable: data-transformer
config:
operations:
- type: multiply
field: value
factor: 2
- id: sink
executable: file-writer
config:
filename: output.csv
format: csv
pipes:
- id: pipe1
from: source
to: processor
- id: pipe2
from: processor
to: sink

stream creation and control

creating a stream

To create a Stream, you define the nodes and pipes in a request to the Streams Engine API:

curl -X POST http://localhost:42069/streams \
-H "Content-Type: application/json" \
-d '{
"nodes": [...],
"pipes": [...]
}'

controlling a stream

Once created, you can control a Stream using signals:

# Start a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "start"
}'

# Pause a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "pause"
}'

# Stop a stream
curl -X POST http://localhost:42069/streams/{stream-id}/signal \
-H "Content-Type: application/json" \
-d '{
"signal": "stop"
}'

modifying a stream

Streams can be modified dynamically, even while running:

# Add nodes to a stream
curl -X POST http://localhost:42069/streams/{stream-id}/nodes \
-H "Content-Type: application/json" \
-d '[...]'

# Add pipes to a stream
curl -X POST http://localhost:42069/streams/{stream-id}/pipes \
-H "Content-Type: application/json" \
-d '[...]'

# Remove nodes from a stream
curl -X DELETE http://localhost:42069/streams/{stream-id}/nodes \
-H "Content-Type: application/json" \
-d '[...]'

# Remove pipes from a stream
curl -X DELETE http://localhost:42069/streams/{stream-id}/pipes \
-H "Content-Type: application/json" \
-d '[...]'

stream properties

Streams have several important properties:

Identifier

Each Stream has a unique identifier, either specified during creation or automatically generated by the system.

Metadata

Streams can have associated metadata to describe their purpose, owner, creation time, etc.

Execution Status

The current state of the Stream (created, running, paused, etc.).

Performance Metrics

Streams maintain metrics on throughput, processing time, etc.

Use Cases

Streams are versatile and can be used for various data processing scenarios:

  • Data Transformation: Convert data between formats
  • Data Filtering: Remove unwanted data points
  • Data Aggregation: Combine multiple data points
  • Real-time Processing: Process data as it arrives
  • Data Visualization: Transform data for display
  • Machine Learning: Preprocess data for algorithms

Best Practices

When working with Streams:

  1. Keep Streams focused: Design each Stream with a clear purpose
  2. Use descriptive IDs: Make node and pipe IDs descriptive of their function
  3. Validate Stream structure: Ensure the DAG is valid before execution
  4. Monitor Stream performance: Check throughput and resource usage
  5. Implement error handling: Configure how to handle processing failures

Next Steps