Skip to main content

Building a Basic Pipeline

This tutorial walks you through creating a simple data processing pipeline using the Streams Engine API. By the end, you'll have a functional stream that reads data from a source, processes it, and outputs the results.

Prerequisites

Before you begin, make sure you have:

  • Access to the Streams Engine API (running on localhost:42069 for this tutorial)
  • Basic understanding of HTTP requests (we'll use curl in examples)
  • Familiarity with JSON and API concepts

Step 1: Plan Your Pipeline

Let's build a pipeline that:

  1. Generates synthetic signal data
  2. Applies a filter to the data
  3. Records the data to a CSV file
  4. Displays the data in real-time through a WebSocket

Our pipeline will look like this:

[Signal Generator] → [Filter] → [CSV Recorder]
└→ [WebSocket Output]

Step 2: Create the Stream

First, let's create a new stream with our pipeline:

curl -X POST http://localhost:42069/stream/streams \
-H "Content-Type: application/json" \
-d '{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"meta": {
"name": "Basic Processing Pipeline",
"description": "Simple pipeline to demonstrate stream processing"
},
"nodes": [
{
"id": "5c3a1d8b-c2f6-4c8a-bcf3-1af7e51b1e06",
"type": "EXECUTABLE",
"meta": {
"type": "SOURCE"
},
"executable": "/usr/share/instinct/signal-generator",
"config": {
"sampleRate": 250,
"channels": 4,
"amplitudes": [1, 2, 3, 4],
"frequencies": [10, 20, 30, 40]
}
},
{
"id": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"type": "EXECUTABLE",
"meta": {
"type": "FILTER"
},
"executable": "/usr/share/instinct/signal-filter",
"config": {
"highpass": 1.0,
"lowpass": 35,
"order": 4
}
},
{
"id": "a27463d5-ccba-4581-95d1-68426c210e55",
"type": "EXECUTABLE",
"meta": {
"type": "RECORDER"
},
"executable": "/usr/share/instinct/csv-recorder",
"config": {
"filename": "/data/recordings/basic-pipeline.csv",
"includeTimestamps": true,
"format": "csv"
}
},
{
"id": "eac48e37-fdf9-4bb4-a2ba-b938fc1a781c",
"type": "EXECUTABLE",
"meta": {
"type": "OUTPUT"
},
"executable": "/usr/share/instinct/websocket-output",
"config": {
"port": 36006
}
}
],
"pipes": [
{
"id": "e93b59c0-4dbf-48c5-9db8-19a312fd4aed",
"source": "5c3a1d8b-c2f6-4c8a-bcf3-1af7e51b1e06",
"destination": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292"
},
{
"id": "c68f741b-1b9e-4f82-b77c-ce14d3d29831",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "a27463d5-ccba-4581-95d1-68426c210e55"
},
{
"id": "b72e9a84-f8c3-42d6-bf85-a15c4f249fdb",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "eac48e37-fdf9-4bb4-a2ba-b938fc1a781c"
}
]
}'

The API will respond with the created stream ID:

{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}

Step 3: Verify the Stream

Let's check that our stream was created correctly:

curl -X GET http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2

You should see the details of your stream, including all nodes and pipes.

Step 4: Start the Stream

Now, let's start the stream to begin processing:

curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/signal \
-H "Content-Type: application/x-www-form-urlencoded" \
-d 'signal=start'

The API will respond with:

{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}

Step 5: Connect to the WebSocket Stream

To visualize the data in real-time, you can connect to the WebSocket:

// Example code for a web application
const socket = new WebSocket('ws://localhost:36006');

socket.onopen = function (e) {
console.log('Connection established');
};

socket.onmessage = function (event) {
const data = JSON.parse(event.data);
console.log('Data received:', data);
// Process or visualize the data here
};

socket.onclose = function (event) {
console.log('Connection closed');
};

socket.onerror = function (error) {
console.log('Error:', error);
};

Step 6: Add a Node (Optional)

Let's say we want to add a transformation node that scales the data before recording:

curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/nodes \
-H "Content-Type: application/json" \
-d '[
{
"id": "f5d92a31-6c7b-484e-bd59-0c8e624382d9",
"type": "EXECUTABLE",
"meta": {
"type": "PROCESSOR"
},
"executable": "/usr/share/instinct/data-transformer",
"config": {
"operations": [
{
"type": "scale",
"factor": 2.0
}
]
}
}
]'

Now, let's connect it to our pipeline by adding new pipes and removing the old one:

curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/pipes \
-H "Content-Type: application/json" \
-d '[
{
"id": "25a48f9d-eba3-4e2a-aa19-bf77c23b491e",
"source": "9fb2e8d4-57a0-4e5c-a5b1-84b8f43c3292",
"destination": "f5d92a31-6c7b-484e-bd59-0c8e624382d9"
},
{
"id": "dc7e5f86-3bbd-461f-b863-45cb4583ef1c",
"source": "f5d92a31-6c7b-484e-bd59-0c8e624382d9",
"destination": "a27463d5-ccba-4581-95d1-68426c210e55"
}
]'

And let's remove the old direct connection between the filter and recorder:

curl -X DELETE http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/pipes \
-H "Content-Type: application/json" \
-d '["c68f741b-1b9e-4f82-b77c-ce14d3d29831"]'

Step 7: Stop the Stream

When you're done, stop the stream:

curl -X POST http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2/signal \
-H "Content-Type: application/x-www-form-urlencoded" \
-d 'signal=stop'

The API will respond with:

{
"id": "7348dd95-7835-4a8a-9ec5-2e4cee1e72e2",
"success": true
}

Step 8: Delete the Stream (Optional)

If you no longer need the stream, you can delete it:

curl -X DELETE http://localhost:42069/stream/streams/7348dd95-7835-4a8a-9ec5-2e4cee1e72e2

Next Steps

Now that you've created your first pipeline, you can:

  • Create more complex pipelines with additional processing nodes
  • Experiment with different signal sources
  • Modify the parameters to optimize processing
  • Learn about advanced stream features like dynamic nodes and error handling

Check out these additional resources: