Skip to main content

pipes

Pipes are the connectors between nodes in the Streams Engine that define how data flows through a stream pipeline.

what is a pipe?

A pipe is a directed connection that:

  • Establishes a one-way data flow between nodes
  • Defines the path that data takes through the processing pipeline
  • Connects an output port of one node to an input port of another node
  • Carries data of specific types between processing units

Pipes are essential components of the Directed Acyclic Graph (DAG) that forms a stream.

pipe structure

Each pipe in the Streams Engine has the following components:

identifier

A unique string that identifies the pipe within a stream. Pipe IDs must be unique within a single stream.

source

Specifies the source node and optionally the specific output port from which data originates.

destination

Specifies the destination node and optionally the specific input port to which data is delivered.

pipe definition

Pipes are typically defined as part of a stream configuration in JSON or YAML format:

{
"id": "data-pipe-1",
"source": "source-node",
"target": "destination-node"
}

For nodes with multiple ports, you can specify the ports explicitly:

{
"id": "specialized-pipe",
"source": "multi-output-node",
"sourcePort": "filtered-data",
"target": "processing-node",
"targetPort": "primary-input"
}

creating and managing pipes

adding pipes to a stream

Pipes can be added to a stream when creating it or later via API:

# Adding pipes to an existing stream
curl -X POST http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "new-pipe",
"source": "data-generator",
"target": "data-processor"
}
]'

yaml-based pipe creation

For more complex setups, YAML format can be used:

curl -X POST http://localhost:42069/streams/stream-123/pipes/yaml \
-H "Content-Type: text/yaml" \
-d '---
- id: pipe-a
source: node-1
target: node-2
- id: pipe-b
source: node-2
target: node-3
sourcePort: processed
targetPort: input'

removing pipes

Pipes can be removed from a stream:

curl -X DELETE http://localhost:42069/streams/stream-123/pipes \
-H "Content-Type: application/json" \
-d '["pipe-a", "pipe-b"]'

data flow in pipes

Pipes transport data packets between nodes, where each packet typically contains:

  • The actual data payload
  • Metadata about the data (timestamps, origins, etc.)
  • Flow control information

pipe types

While the basic pipe provides point-to-point connections, specialized pipes can offer additional functionality:

basic pipes

Direct connections between two nodes with minimal processing overhead.

buffered pipes

Include internal buffers to handle temporary mismatches in processing rates between source and destination.

transforming pipes

Apply lightweight transformations to data as it passes through (data type conversion, simple filtering).

broadcasting pipes

Send the same data to multiple destination nodes.

performance considerations

When designing pipe configurations:

  1. Buffer Sizes: Configure appropriate buffer sizes for expected data rates
  2. Backpressure: Consider how pipes handle situations where destination nodes process data slower than source nodes
  3. Data Volume: Be aware of the volume of data flowing through pipes, especially in high-throughput sections of the pipeline
  4. Serialization: If pipes connect nodes across process boundaries, consider serialization overhead

best practices

When working with pipes:

  1. Descriptive IDs: Use clear, descriptive IDs for pipes that indicate their function
  2. Minimize Data: Only pass necessary data through pipes to reduce overhead
  3. Error Handling: Implement strategies for handling pipe disconnections or blockages
  4. Documentation: Document pipe connections and expected data formats
  5. Validation: Ensure source and destination ports are compatible regarding data types

next steps