building a basic pipeline
This tutorial guides you through creating a basic data processing pipeline using the Instinct API's Streams Engine.
prerequisites
Before you begin, ensure you have:
- An Instinct headset connected to your local network
- Basic knowledge of HTTP requests (using curl or a similar tool)
- Understanding of the Streams and Nodes concepts
step 1: plan your pipeline
Before creating a stream, plan the data flow and processing steps you need. For this tutorial, we'll create a simple pipeline that:
- Generates random numbers (source)
- Filters values above a threshold (processing)
- Calculates a running average (processing)
- Outputs results for visualization (sink)
Each step in this pipeline will be a node, and we'll connect them with pipes to form a stream (or DAG - Directed Acyclic Graph).
Here's a visual representation of our planned pipeline:
[Random Generator] → [Threshold Filter] → [Running Average] → [Output Formatter]
step 2: create the stream
First, let's create a new stream with our pipeline:
curl -X POST http://localhost:42069/stream/streams \
-H "Content-Type: application/json" \
-d '{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"meta": {
"name": "Basic Processing Pipeline",
"description": "Simple pipeline to demonstrate stream processing"
},
"nodes": [
{
"id": "5c3a1d8b-c2f6-4c8a-bcf3-1af7e51b1e06",
"type": "EXECUTABLE",
"meta": {
"type": "SOURCE"
},
"executable": "/usr/share/instinct/signal-generator",
"config": {
"sampleRate": 250,
"channels": 4,
"amplitudes": [1, 2, 3, 4],
"frequencies": [10, 20, 30, 40]
}
},
{
"id": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"type": "EXECUTABLE",
"meta": {
"type": "FILTER"
},
"executable": "/usr/share/instinct/signal-filter",
"config": {
"highpass": 1.0,
"lowpass": 35,
"order": 4
}
},
{
"id": "a27463d5-ccba-4581-95d1-68426c210e55",
"type": "EXECUTABLE",
"meta": {
"type": "RECORDER"
},
"executable": "/usr/share/instinct/csv-recorder",
"config": {
"filename": "/data/recordings/basic-pipeline.csv",
"includeTimestamps": true,
"format": "csv"
}
},
{
"id": "eac48e37-fdf9-4bb4-a2ba-b938fc1a781c",
"type": "EXECUTABLE",
"meta": {
"type": "OUTPUT"
},
"executable": "/usr/share/instinct/websocket-output",
"config": {
"port": 36006
}
}
],
"pipes": [
{
"id": "e93b59c0-4dbf-48c5-9db8-19a312fd4aed",
"source": "5c3a1d8b-c2f6-4c8a-bcf3-1af7e51b1e06",
"destination": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292"
},
{
"id": "c68f741b-1b9e-4f82-b77c-ce14d3d29831",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "a27463d5-ccba-4581-95d1-68426c210e55"
},
{
"id": "b72e9a84-f8c3-42d6-bf85-a15c4f249fdb",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "eac48e37-fdf9-4bb4-a2ba-b938fc1a781c"
}
]
}'
The API will respond with the created stream ID:
{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}
step 3: verify the stream
Let's check that our stream was created correctly:
curl -X GET http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2
You should see the details of your stream, including all nodes and pipes.
step 4: start the stream
Now, let's start the stream to begin processing:
curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/signal \
-H "Content-Type: application/x-www-form-urlencoded" \
-d 'signal=start'
The API will respond with:
{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}
step 5: connect to the websocket stream
To visualize the data in real-time, you can connect to the WebSocket:
// Example code for a web application
const socket = new WebSocket('ws://localhost:36006');
socket.onopen = function (e) {
console.log('Connection established');
};
socket.onmessage = function (event) {
const data = JSON.parse(event.data);
console.log('Data received:', data);
// Process or visualize the data here
};
socket.onclose = function (event) {
console.log('Connection closed');
};
socket.onerror = function (error) {
console.log('Error:', error);
};
step 6: add a node (optional)
Let's say we want to add a transformation node that scales the data before recording:
curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "f5d92a31-6c7b-484e-bd59-0c8e624382d9",
"type": "EXECUTABLE",
"meta": {
"type": "PROCESSOR"
},
"executable": "/usr/share/instinct/data-transformer",
"config": {
"operations": [
{
"type": "scale",
"factor": 2.0
}
]
}
}
]'
Now, let's connect it to our pipeline by adding new pipes and removing the old one:
curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "25a48f9d-eba3-4e2a-aa19-bf77c23b491e",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "f5d92a31-6c7b-484e-bd59-0c8e624382d9"
},
{
"id": "dc7e5f86-3bbd-461f-b863-45cb4583ef1c",
"source": "f5d92a31-6c7b-484e-bd59-0c8e624382d9",
"destination": "a27463d5-ccba-4581-95d1-68426c210e55"
}
]'
And let's remove the old direct connection between the filter and recorder:
curl -X DELETE http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/pipes \
-H "Content-Type: application/json" \
-d '["c68f741b-1b9e-4f82-b77c-ce14d3d29831"]'
step 7: stop the stream
When you're done, stop the stream:
curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/signal \
-H "Content-Type: application/x-www-form-urlencoded" \
-d 'signal=stop'
The API will respond with:
{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}
step 8: delete the stream (optional)
If you no longer need the stream, you can delete it:
curl -X DELETE http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2
next steps
Now that you've created your first pipeline, you can:
- Create more complex pipelines with additional processing nodes
- Experiment with different signal sources
- Modify the parameters to optimize processing
- Learn about advanced stream features like dynamic nodes and error handling
Check out these additional resources: