Skip to main content

Creating Sink Nodes for Data Output

In this tutorial, we'll develop sink Nodes that consume processed data and make it available for external visualization, storage, or further analysis. Specifically, we'll create a ZeroMQ-based TCP sink Node that can stream processed EEG data to external applications.

Understanding Sink Nodes

Sink Nodes are the endpoints of data processing pipelines. They:

  • Receive processed data from upstream Nodes
  • Have inputs but typically no outputs within the pipeline
  • Interface with external systems or provide final outputs
  • May perform visualization, storage, streaming, or other output functions

For EEG researchers, sink Nodes are crucial for:

  • Real-time visualization of brain activity
  • Data export for offline analysis
  • Integration with external tools and platforms
  • Streaming data to custom analysis applications

Creating a TCP Sink Node

Let's create a sink Node that streams processed EEG data over TCP using ZeroMQ. This allows researchers to connect custom visualization or analysis tools to the Instinct pipeline.

// src/tcp-sink.ts
import { Node } from '@nexstem/wisdom-js';
import { Publisher } from 'zeromq';

interface TCPSinkConfig {
host: string; // Host to bind to (e.g., '127.0.0.1')
port: number; // Port to bind to (e.g., 5556)
topic?: string; // Topic to publish on (optional)
formatOutput?: boolean; // Whether to format the output as JSON
}

class TCPSinkNode extends Node<TCPSinkConfig> {
private isRunning = false;
private publisher: Publisher | null = null;
private topic: string;

constructor(params: any) {
super({
inputs: { min: 1 }, // Require at least one input
outputs: { exact: 0 }, // No outputs since this is a sink node
...params,
});

this.topic = this.config.topic || 'eeg-data';
}

async onBuild(): Promise<void> {
try {
this.publisher = new Publisher();
console.log('TCP Sink built successfully');
} catch (error) {
console.error('Error during TCP Sink build:', error);
throw error;
}
}

async onStart(): Promise<void> {
try {
if (!this.publisher) {
throw new Error('Publisher not initialized');
}

const endpoint = `tcp://${this.config.host}:${this.config.port}`;
await this.publisher.bind(endpoint);
this.isRunning = true;
console.log(`TCP Sink started and bound to ${endpoint}`);
} catch (error) {
console.error('Error starting TCP Sink:', error);
throw error;
}
}

onData(inletNumber: number, packet: any[]): void {
if (!this.isRunning || !this.publisher) {
return;
}

try {
// Format the packet if configured to do so
const dataToSend = this.config.formatOutput
? this.formatPacket(packet)
: packet;

// Send data over ZeroMQ with the configured topic
this.publisher.send([this.topic, ...dataToSend]).catch((error) => {
console.error('Error sending data via ZeroMQ:', error);
});
} catch (error) {
console.error('Error processing data in TCP Sink:', error);
}
}

private formatPacket(packet: any[]): any[] {
try {
// Attempt to parse binary data as JSON if needed
const formattedPacket = packet.map((item) => {
if (Buffer.isBuffer(item)) {
try {
// Try to parse as JSON
return JSON.parse(item.toString());
} catch {
// If parsing fails, return as-is
return item;
}
}
return item;
});

// Convert the whole packet to a JSON string
return [Buffer.from(JSON.stringify(formattedPacket))];
} catch (error) {
console.error('Error formatting packet:', error);
return packet;
}
}

async onStop(): Promise<void> {
try {
if (this.publisher && this.isRunning) {
await this.publisher.unbind(
`tcp://${this.config.host}:${this.config.port}`
);
this.isRunning = false;
console.log('TCP Sink stopped');
}
} catch (error) {
console.error('Error stopping TCP Sink:', error);
throw error;
}
}

async onShutdown(): Promise<void> {
try {
if (this.publisher) {
if (this.isRunning) {
await this.onStop();
}
this.publisher = null;
}
console.log('TCP Sink shutdown completed');
} catch (error) {
console.error('Error during TCP Sink shutdown:', error);
throw error;
}
}

onSignal(signal: string, packet: any[]): void {
console.log(`TCP Sink received signal: ${signal}`, packet);
}
}

export default TCPSinkNode;

Understanding the TCP Sink Node

Let's examine the key components of this sink Node:

Configuration

interface TCPSinkConfig {
host: string; // Host to bind to (e.g., '127.0.0.1')
port: number; // Port to bind to (e.g., 5556)
topic?: string; // Topic to publish on (optional)
formatOutput?: boolean; // Whether to format the output as JSON
}

This configuration allows researchers to specify:

  • The host and port to bind to
  • An optional topic for data categorization
  • Whether to format the output as JSON for easier consumption by external tools

ZeroMQ Publisher Setup

async onBuild(): Promise<void> {
try {
this.publisher = new Publisher();
console.log('TCP Sink built successfully');
} catch (error) {
console.error('Error during TCP Sink build:', error);
throw error;
}
}

async onStart(): Promise<void> {
try {
if (!this.publisher) {
throw new Error('Publisher not initialized');
}

const endpoint = `tcp://${this.config.host}:${this.config.port}`;
await this.publisher.bind(endpoint);
this.isRunning = true;
console.log(`TCP Sink started and bound to ${endpoint}`);
} catch (error) {
console.error('Error starting TCP Sink:', error);
throw error;
}
}

In the onBuild method, we initialize the ZeroMQ publisher, and in the onStart method, we bind it to the specified TCP endpoint. This creates a socket that external applications can connect to.

Data Handling

onData(inletNumber: number, packet: any[]): void {
if (!this.isRunning || !this.publisher) {
return;
}

try {
// Format the packet if configured to do so
const dataToSend = this.config.formatOutput
? this.formatPacket(packet)
: packet;

// Send data over ZeroMQ with the configured topic
this.publisher.send([this.topic, ...dataToSend]).catch(error => {
console.error('Error sending data via ZeroMQ:', error);
});
} catch (error) {
console.error('Error processing data in TCP Sink:', error);
}
}

The onData method receives processed data from upstream Nodes and forwards it over the ZeroMQ publisher. If configured, it formats the data as JSON for easier consumption by external applications.

Connecting to the TCP Sink from External Applications

Researchers can connect to this TCP sink from various programming environments. Here are examples in different languages:

Python

import zmq
import json

# Create ZeroMQ subscriber
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "eeg-data")

print("Connected to EEG data stream. Waiting for data...")

while True:
# Receive message
topic, data = socket.recv_multipart()

# Parse the data
try:
eeg_data = json.loads(data.decode('utf-8'))
print(f"Received EEG data with {len(eeg_data[0])} channels")

# Process the data here
# For example, extract band powers or plot the signal
except Exception as e:
print(f"Error processing data: {e}")

MATLAB

% Create ZeroMQ subscriber
import org.zeromq.*
context = zmq.Context();
socket = context.socket(ZMQ.SUB);
socket.connect("tcp://127.0.0.1:5556");
socket.subscribe("eeg-data");

disp('Connected to EEG data stream. Waiting for data...');

% Create a figure for real-time plotting
figure('Name', 'Real-time EEG Data', 'NumberTitle', 'off');
hold on;

while true
% Receive message
try
% Skip the topic part
topic = socket.recvStr();
data = socket.recvStr();

% Parse the JSON data
eeg_data = jsondecode(data);

% For example, plot the first channel
if ~isempty(eeg_data) && isfield(eeg_data, 'channels')
% Extract values from the first channel
channel_data = eeg_data.channels(1).values;

% Plot the data
cla;
plot(channel_data);
title('EEG Channel 1');
xlabel('Sample');
ylabel('Amplitude');
drawnow;
end
catch e
disp(['Error processing data: ' e.message]);
end
end

JavaScript (Node.js)

const zmq = require('zeromq');

async function run() {
const socket = new zmq.Subscriber();
socket.connect('tcp://127.0.0.1:5556');
socket.subscribe('eeg-data');

console.log('Connected to EEG data stream. Waiting for data...');

for await (const [topic, data] of socket) {
try {
// Parse the data
const eegData = JSON.parse(data.toString());
console.log(
`Received EEG data with ${eegData[0].channels.length} channels`
);

// Process the data here
// For example, extract specific frequency bands or visualize
} catch (error) {
console.error('Error processing data:', error);
}
}
}

run();

Applications in EEG Research

This TCP sink Node enables several important capabilities for EEG researchers:

Real-time Visualization

Researchers can create custom visualizations tailored to their specific experimental needs:

  • Topographic maps of brain activity
  • Time-frequency plots
  • Source localization visualizations
  • Custom dashboards combining multiple data streams

Integration with Analysis Tools

The TCP sink allows integration with specialized analysis tools:

  • MATLAB for advanced signal processing
  • Python with MNE or FieldTrip for EEG analysis
  • R for statistical analysis
  • Custom analysis pipelines

Data Fusion

Researchers can combine EEG data with other data sources:

  • Behavioral measurements
  • Eye-tracking data
  • Physiological measurements (heart rate, GSR)
  • Environmental sensors

Closed-loop Systems

The TCP sink enables building closed-loop systems where EEG data influences stimuli:

  • Neurofeedback applications
  • Adaptive brain-computer interfaces
  • Closed-loop brain stimulation

Testing the TCP Sink Node

To test your TCP sink Node:

  1. Create a simple pipeline in the Instinct ecosystem that generates EEG data and sends it to your TCP sink.
  2. Run one of the example client scripts provided above to connect to the TCP sink.
  3. Verify that data is being correctly transmitted and received.

Extending the TCP Sink Node

You can extend this Node in several ways:

  1. Authentication: Add authentication mechanisms to secure the data stream.
  2. Compression: Implement data compression to reduce bandwidth usage.
  3. Buffering: Add buffering capabilities to handle temporary connection issues.
  4. Data Filtering: Add the ability to filter which data is transmitted based on certain criteria.
  5. Multiple Endpoints: Support publishing to multiple endpoints simultaneously.

Conclusion

In this tutorial, you've learned how to create a TCP sink Node that can stream processed EEG data to external applications using ZeroMQ. This Node serves as a bridge between the Instinct ecosystem and the broader research ecosystem, enabling sophisticated analysis and visualization workflows.

By developing custom sink Nodes, EEG researchers can integrate the Instinct platform with their existing workflows and tools, creating powerful and flexible data processing pipelines.