Skip to main content

nodes

Nodes are the fundamental processing units in the Streams Engine. Each node performs a specific operation on data as it flows through the system.

what is a node?

A node is an independent processing component that:

  • Receives data from one or more input pipes
  • Performs specific operations on that data
  • Sends the results to one or more output pipes

Nodes are the building blocks of stream processing pipelines, with each node responsible for a discrete step in the overall workflow.

node types

The Streams Engine supports several types of nodes:

source nodes

Source nodes generate data for the stream. They have no input pipes, only output pipes. Examples include:

  • File Readers: Read data from files
  • API Connectors: Retrieve data from external APIs
  • Database Readers: Query and fetch data from databases
  • Generators: Create synthetic data based on patterns or rules
  • Device Connectors: Connect to hardware devices (like EEG headsets)

processing nodes

Processing nodes transform data as it passes through. They have both input and output pipes. Examples include:

  • Filters: Remove unwanted data points
  • Transformers: Change data format or structure
  • Aggregators: Combine multiple data points
  • Analyzers: Perform calculations or analysis on data
  • Splitters: Divide data into multiple streams

sink nodes

Sink nodes consume data from the stream. They have input pipes but no output pipes. Examples include:

  • File Writers: Save data to files
  • Database Writers: Store data in databases
  • Visualizers: Display data in charts or graphs
  • API Senders: Send data to external services
  • Notification Systems: Alert users based on data conditions

node structure

Each node in the Streams Engine has the following components:

identifier

A unique string that identifies the node within a stream. Node IDs must be unique within a single stream.

executable

The name of the processing module that the node will run. Executables define the actual functionality of the node, such as "file-reader", "data-transformer", or "database-writer".

configuration

A JSON object containing parameters that control the node's behavior. The structure of this object depends on the node's executable. For example:

{
"id": "csv-reader",
"executable": "file-reader",
"config": {
"filename": "data.csv",
"format": "csv",
"delimiter": ",",
"skipHeader": true
}
}

ports

Interfaces where data enters or exits the node:

  • Input Ports: Where data arrives from other nodes
  • Output Ports: Where processed data is sent to other nodes

Most nodes have default "input" and "output" ports, but complex nodes may have multiple named ports for different data flows.

node lifecycle

Nodes go through several states during operation:

  1. Created: The node is defined but not yet initialized
  2. Initialized: The node has loaded its configuration and prepared resources
  3. Running: The node is actively processing data
  4. Paused: The node has temporarily halted processing
  5. Error: The node encountered an error during processing
  6. Terminated: The node has released resources and stopped processing

creating nodes

Nodes are created as part of a stream definition or added to an existing stream:

# Adding a node to an existing stream
curl -X POST http://localhost:42069/streams/stream-123/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "filter",
"executable": "data-filter",
"config": {
"condition": "value > 10"
}
}
]'

custom nodes

The Streams Engine allows developers to create custom nodes for specialized processing needs:

  1. Standard Interface: Custom nodes must implement the node interface
  2. Input/Output Handling: Define how data is received and transmitted
  3. Configuration Schema: Specify the expected configuration structure
  4. Error Handling: Implement proper error reporting and recovery

performance considerations

When designing and configuring nodes:

  1. Resource Usage: Different node types have varying CPU and memory requirements
  2. Throughput: Processing time affects overall stream performance
  3. Parallelism: Some nodes can process multiple items simultaneously
  4. Buffering: Configure appropriate buffer sizes for expected data rates
  5. Error Handling: Decide how to handle processing failures

best practices

When working with nodes:

  1. Single Responsibility: Each node should perform one specific task
  2. Descriptive IDs: Use clear, descriptive IDs for nodes
  3. Appropriate Configuration: Configure nodes for expected data volume and patterns
  4. Monitoring: Track node performance metrics during execution
  5. Documentation: Document custom node functionality and configuration options

next steps