pipes
Pipes are the connectors between nodes in the Streams Engine that define how data flows through a stream pipeline.
what is a pipe?
A pipe is a directed connection that:
- Establishes a one-way data flow between nodes
- Defines the path that data takes through the processing pipeline
- Connects an output port of one node to an input port of another node
- Carries data of specific types between processing units
Pipes are essential components of the Directed Acyclic Graph (DAG) that forms a stream.
pipe structure
Each pipe in the Streams Engine has the following components:
identifier
A unique string that identifies the pipe within a stream. Pipe IDs must be unique within a single stream.
source
Specifies the source node and optionally the specific output port from which data originates.
destination
Specifies the destination node and optionally the specific input port to which data is delivered.
pipe definition
Pipes are typically defined as part of a stream configuration in JSON or YAML format:
{
"id": "data-pipe-1",
"source": "source-node",
"target": "destination-node"
}
For nodes with multiple ports, you can specify the ports explicitly:
{
"id": "specialized-pipe",
"source": "multi-output-node",
"sourcePort": "filtered-data",
"target": "processing-node",
"targetPort": "primary-input"
}
creating and managing pipes
adding pipes to a stream
Pipes can be added to a stream when creating it or later via API:
# Adding pipes to an existing stream
curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "new-pipe",
"source": "data-generator",
"target": "data-processor"
}
]'
yaml-based pipe creation
For more complex setups, YAML format can be used:
curl -X POST http://localhost:42069/streams/stream-123/pipes/yaml \
-H "Content-Type: text/yaml" \
-d '---
- id: pipe-a
source: node-1
target: node-2
- id: pipe-b
source: node-2
target: node-3
sourcePort: processed
targetPort: input'
removing pipes
Pipes can be removed from a stream:
curl -X DELETE http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '["pipe-a", "pipe-b"]'
data flow in pipes
Pipes transport data packets between nodes, where each packet typically contains:
- The actual data payload
- Metadata about the data (timestamps, origins, etc.)
- Flow control information
pipe types
While the basic pipe provides point-to-point connections, specialized pipes can offer additional functionality:
basic pipes
Direct connections between two nodes with minimal processing overhead.
buffered pipes
Include internal buffers to handle temporary mismatches in processing rates between source and destination.
transforming pipes
Apply lightweight transformations to data as it passes through (data type conversion, simple filtering).
broadcasting pipes
Send the same data to multiple destination nodes.
performance considerations
When designing pipe configurations:
- Buffer Sizes: Configure appropriate buffer sizes for expected data rates
- Backpressure: Consider how pipes handle situations where destination nodes process data slower than source nodes
- Data Volume: Be aware of the volume of data flowing through pipes, especially in high-throughput sections of the pipeline
- Serialization: If pipes connect nodes across process boundaries, consider serialization overhead
best practices
When working with pipes:
- Descriptive IDs: Use clear, descriptive IDs for pipes that indicate their function
- Minimize Data: Only pass necessary data through pipes to reduce overhead
- Error Handling: Implement strategies for handling pipe disconnections or blockages
- Documentation: Document pipe connections and expected data formats
- Validation: Ensure source and destination ports are compatible regarding data types
next steps
- Learn about Streams
- Understand Nodes
- Explore DAGs
- Follow our Basic Pipeline Tutorial