Creating a Basic Processing Node
This tutorial guides you through the process of creating a basic processing Node for the Instinct ecosystem. We'll develop a simple filter Node that can remove common artifacts from EEG data.
Understanding the Node Class
The Node
base class from the @nexstem/wisdom-js
package provides the foundation for all custom Nodes. Let's examine its core structure:
abstract class Node<T = any> {
// Lifecycle methods you must implement
abstract onBuild(): Promise<void>;
abstract onData(index: number, packet: any[]): void;
abstract onSignal(signal: string, packet: any[]): void;
abstract onShutdown(): Promise<any>;
abstract onStart(): Promise<any> | void;
abstract onStop(): Promise<any> | void;
// Methods for sending data
public async sendPacketToAllOutputs(packet: any[]): Promise<void>;
public async sendPacketToOutput(index: number, packet: any[]): Promise<void>;
}
Creating a Basic Filter Node
Let's create a simple filter Node that applies a basic high-pass filter to EEG data. This is useful for removing slow drift and DC offset from EEG signals.
// src/highpass-filter.ts
import { Node } from '@nexstem/wisdom-js';
import { TimeSeriesPacket } from './types/time-series-packet'; // You would need to define this type
interface HighPassFilterConfig {
cutoffFrequency: number; // Cutoff frequency in Hz
sampleRate: number; // Sample rate in Hz
channels: number; // Number of EEG channels
}
class HighPassFilterNode extends Node<HighPassFilterConfig> {
private isRunning = false;
private filterCoefficient = 0;
private prevSamples: number[][] = [];
private prevFiltered: number[][] = [];
constructor(params: any) {
super({
inputs: { min: 1, max: 1 }, // Requires exactly one input
outputs: { min: 1 }, // At least one output
...params,
});
}
async onBuild(): Promise<void> {
// Calculate the filter coefficient (alpha)
// RC = 1/(2*PI*fc) where fc is the cutoff frequency
const RC = 1.0 / (2.0 * Math.PI * this.config.cutoffFrequency);
const dt = 1.0 / this.config.sampleRate;
this.filterCoefficient = RC / (RC + dt);
// Initialize previous samples and filtered values arrays
this.prevSamples = Array(this.config.channels)
.fill(0)
.map(() => [0]);
this.prevFiltered = Array(this.config.channels)
.fill(0)
.map(() => [0]);
console.log(
`HighPass Filter built with cutoff: ${this.config.cutoffFrequency}Hz`
);
}
onData(inletNumber: number, packet: any[]): void {
if (!this.isRunning) return;
try {
// Assume the packet contains a TimeSeriesPacket
const timeSeriesData = TimeSeriesPacket.decode(packet[0]);
// Apply the high-pass filter to each channel
const filteredChannels = timeSeriesData.channels.map((channel, idx) => {
// Apply the filter to all values in the channel
const filteredValues = channel.values.map((value, sampleIdx) => {
// Simple high-pass filter: y[n] = alpha * (y[n-1] + x[n] - x[n-1])
const prevSample = this.prevSamples[idx][0];
const prevFiltered = this.prevFiltered[idx][0];
const filtered =
this.filterCoefficient * (prevFiltered + value - prevSample);
// Store the current values for the next iteration
this.prevSamples[idx][0] = value;
this.prevFiltered[idx][0] = filtered;
return filtered;
});
// Return the filtered channel data
return {
...channel,
values: filteredValues,
};
});
// Create a new TimeSeriesPacket with the filtered data
const filteredPacket = {
...timeSeriesData,
channels: filteredChannels,
};
// Send the filtered data to all outputs
this.sendPacketToAllOutputs([
TimeSeriesPacket.encode(filteredPacket).finish(),
]);
} catch (error) {
console.error('Error processing data:', error);
}
}
async onStart(): Promise<void> {
this.isRunning = true;
console.log('HighPass Filter started');
}
async onStop(): Promise<void> {
this.isRunning = false;
console.log('HighPass Filter stopped');
}
async onShutdown(): Promise<void> {
// Clean up any resources
console.log('HighPass Filter shutdown');
}
onSignal(signal: string, packet: any[]): void {
// Handle any custom signals
console.log(`Signal received: ${signal}`, packet);
}
}
// Export the Node class
export default HighPassFilterNode;
Understanding the Code
Let's break down the key components of our filter Node:
Configuration Interface
interface HighPassFilterConfig {
cutoffFrequency: number; // Cutoff frequency in Hz
sampleRate: number; // Sample rate in Hz
channels: number; // Number of EEG channels
}
This interface defines the configuration parameters our Node accepts. For EEG researchers, parameters like cutoff frequency and sample rate are familiar concepts in signal processing.
Constructor
constructor(params: any) {
super({
inputs: { min: 1, max: 1 }, // Requires exactly one input
outputs: { min: 1 }, // At least one output
...params
});
}
The constructor specifies that this Node requires exactly one input stream (the raw EEG data) and produces at least one output stream (the filtered data).
onBuild Method
async onBuild(): Promise<void> {
// Calculate the filter coefficient (alpha)
// RC = 1/(2*PI*fc) where fc is the cutoff frequency
const RC = 1.0 / (2.0 * Math.PI * this.config.cutoffFrequency);
const dt = 1.0 / this.config.sampleRate;
this.filterCoefficient = RC / (RC + dt);
// Initialize previous samples and filtered values arrays
this.prevSamples = Array(this.config.channels).fill(0).map(() => [0]);
this.prevFiltered = Array(this.config.channels).fill(0).map(() => [0]);
console.log(`HighPass Filter built with cutoff: ${this.config.cutoffFrequency}Hz`);
}
This method is called during Node initialization and sets up the filter coefficients based on the cutoff frequency and sample rate. For EEG processing, proper filter initialization is crucial for obtaining accurate results.
onData Method
onData(inletNumber: number, packet: any[]): void {
if (!this.isRunning) return;
try {
// Assume the packet contains a TimeSeriesPacket
const timeSeriesData = TimeSeriesPacket.decode(packet[0]);
// Apply the high-pass filter to each channel
const filteredChannels = timeSeriesData.channels.map((channel, idx) => {
// Apply the filter to all values in the channel
const filteredValues = channel.values.map((value, sampleIdx) => {
// Simple high-pass filter: y[n] = alpha * (y[n-1] + x[n] - x[n-1])
const prevSample = this.prevSamples[idx][0];
const prevFiltered = this.prevFiltered[idx][0];
const filtered = this.filterCoefficient * (prevFiltered + value - prevSample);
// Store the current values for the next iteration
this.prevSamples[idx][0] = value;
this.prevFiltered[idx][0] = filtered;
return filtered;
});
// Return the filtered channel data
return {
...channel,
values: filteredValues
};
});
// Create a new TimeSeriesPacket with the filtered data
const filteredPacket = {
...timeSeriesData,
channels: filteredChannels
};
// Send the filtered data to all outputs
this.sendPacketToAllOutputs([TimeSeriesPacket.encode(filteredPacket).finish()]);
} catch (error) {
console.error('Error processing data:', error);
}
}
This method is the heart of our Node. It:
- Receives EEG data on the input
- Applies a first-order high-pass filter to each channel
- Sends the filtered data to all outputs
The filter implementation uses a simple IIR (Infinite Impulse Response) high-pass filter algorithm: y[n] = α * (y[n-1] + x[n] - x[n-1])
, where:
y[n]
is the current filtered valuey[n-1]
is the previous filtered valuex[n]
is the current input valuex[n-1]
is the previous input valueα
is the filter coefficient calculated from the cutoff frequency
Time Series Packet Type
The example assumes a TimeSeriesPacket
type which might look something like this:
// src/types/time-series-packet.ts
export interface Channel {
channelId: number;
values: number[];
timestamp: number[];
}
export interface TimeSeriesPacket {
channels: Channel[];
}
// This is a simplified example. In a real implementation,
// you would likely use Protocol Buffers or similar for serialization.
export const TimeSeriesPacket = {
encode: (packet: TimeSeriesPacket) => ({
finish: () => Buffer.from(JSON.stringify(packet)),
}),
decode: (buffer: Buffer) => JSON.parse(buffer.toString()) as TimeSeriesPacket,
};
Testing the Node
To test your filter Node, you can create a simple test script:
// src/test-filter.ts
import HighPassFilterNode from './highpass-filter';
// Create a test instance with appropriate configuration
const filterNode = new HighPassFilterNode({
config: {
cutoffFrequency: 0.5, // 0.5 Hz high-pass filter (removes slow drift)
sampleRate: 250, // 250 Hz sample rate (common for EEG)
channels: 8, // 8 EEG channels
},
});
// In a real application, the Instinct framework would manage the lifecycle
// and data flow for you.
Next Steps
In this tutorial, you've learned how to create a basic processing Node that applies a high-pass filter to EEG data. This type of filter is commonly used in EEG preprocessing to remove slow drifts in the signal.
In the next tutorial, we'll develop more advanced EEG processing Nodes, including feature extraction methods like band power calculation.