nodes
Nodes are the fundamental processing units in the Streams Engine. Each node performs a specific operation on data as it flows through the system.
what is a node?
A node is an independent processing component that:
- Receives data from one or more input pipes
- Performs specific operations on that data
- Sends the results to one or more output pipes
Nodes are the building blocks of stream processing pipelines, with each node responsible for a discrete step in the overall workflow.
node types
The Streams Engine supports several types of nodes:
source nodes
Source nodes generate data for the stream. They have no input pipes, only output pipes. Examples include:
- File Readers: Read data from files
- API Connectors: Retrieve data from external APIs
- Database Readers: Query and fetch data from databases
- Generators: Create synthetic data based on patterns or rules
- Device Connectors: Connect to hardware devices (like EEG headsets)
processing nodes
Processing nodes transform data as it passes through. They have both input and output pipes. Examples include:
- Filters: Remove unwanted data points
- Transformers: Change data format or structure
- Aggregators: Combine multiple data points
- Analyzers: Perform calculations or analysis on data
- Splitters: Divide data into multiple streams
sink nodes
Sink nodes consume data from the stream. They have input pipes but no output pipes. Examples include:
- File Writers: Save data to files
- Database Writers: Store data in databases
- Visualizers: Display data in charts or graphs
- API Senders: Send data to external services
- Notification Systems: Alert users based on data conditions
node structure
Each node in the Streams Engine has the following components:
identifier
A unique string that identifies the node within a stream. Node IDs must be unique within a single stream.
executable
The name of the processing module that the node will run. Executables define the actual functionality of the node, such as "file-reader", "data-transformer", or "database-writer".
configuration
A JSON object containing parameters that control the node's behavior. The structure of this object depends on the node's executable. For example:
{
"id": "csv-reader",
"executable": "file-reader",
"config": {
"filename": "data.csv",
"format": "csv",
"delimiter": ",",
"skipHeader": true
}
}
ports
Interfaces where data enters or exits the node:
- Input Ports: Where data arrives from other nodes
- Output Ports: Where processed data is sent to other nodes
Most nodes have default "input" and "output" ports, but complex nodes may have multiple named ports for different data flows.
node lifecycle
Nodes go through several states during operation:
- Created: The node is defined but not yet initialized
- Initialized: The node has loaded its configuration and prepared resources
- Running: The node is actively processing data
- Paused: The node has temporarily halted processing
- Error: The node encountered an error during processing
- Terminated: The node has released resources and stopped processing
creating nodes
Nodes are created as part of a stream definition or added to an existing stream:
# Adding a node to an existing stream
curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "filter",
"executable": "data-filter",
"config": {
"condition": "value > 10"
}
}
]'
custom nodes
The Streams Engine allows developers to create custom nodes for specialized processing needs:
- Standard Interface: Custom nodes must implement the node interface
- Input/Output Handling: Define how data is received and transmitted
- Configuration Schema: Specify the expected configuration structure
- Error Handling: Implement proper error reporting and recovery
performance considerations
When designing and configuring nodes:
- Resource Usage: Different node types have varying CPU and memory requirements
- Throughput: Processing time affects overall stream performance
- Parallelism: Some nodes can process multiple items simultaneously
- Buffering: Configure appropriate buffer sizes for expected data rates
- Error Handling: Decide how to handle processing failures
best practices
When working with nodes:
- Single Responsibility: Each node should perform one specific task
- Descriptive IDs: Use clear, descriptive IDs for nodes
- Appropriate Configuration: Configure nodes for expected data volume and patterns
- Monitoring: Track node performance metrics during execution
- Documentation: Document custom node functionality and configuration options
next steps
- Learn about Streams
- Understand Pipes
- Explore DAGs
- Follow our Basic Pipeline Tutorial