Skip to main content

Stream Operations API

The Streams Engine provides several endpoints for creating and managing streams. This page documents the request and response formats for these operations.

Create Stream

Creates a new stream processing pipeline with specified nodes and pipes.

Endpoint

POST /stream/streams

Request Body

{
"id": "cdc566b5-0221-4018-a99f-23265af1e04d",
"meta": {
"name": "Streams - Realtime stream - cdc566b5-0221-4018-a99f-23265af1e04d"
},
"nodes": [
{
"id": "94f92dc4-5a84-45c7-8543-9c18ba4df95c",
"type": "EXECUTABLE",
"meta": {
"type": "SOURCE"
},
"executable": "node /usr/share/engines/worker-scripts/SignalGenerator.js",
"config": {
"sampleRate": 1000,
"amplitudes": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"frequencies": [2, 3, 5, 6, 11, 23, 30, 50, 75]
}
},
{
"id": "a297523e-ab4b-41d9-b29c-c434eb4c9723",
"type": "EXECUTABLE",
"meta": {
"type": "OUTPUT"
},
"executable": "node /usr/share/engines/worker-scripts/WebSocket.js",
"config": {
"port": 36006
}
}
],
"pipes": [
{
"id": "50198da6-2426-4276-9ba4-d30f396f3d54",
"source": "94f92dc4-5a84-45c7-8543-9c18ba4df95c",
"destination": "a297523e-ab4b-41d9-b29c-c434eb4c9723"
}
]
}

Response

{
"id": "cdc566b5-0221-4018-a99f-23265af1e04d",
"success": true
}

Create Stream from YAML

Creates a new stream processing pipeline from a YAML definition.

Endpoint

POST /stream/streams/yaml

Request Body

The request body should be a YAML string with a structure similar to the JSON format above:

---
id: e795dc27-8da2-42e8-8d3b-50e5e4ff9943
meta:
name: Testing creation of a stream from API
nodes:
- id: 64cb7ca2-ebca-4502-8fc4-208fa90545f1
type: EXECUTABLE_PRODUCER
executable: node /usr/share/engines/worker-scripts/SignalGenerator.ts
config:
sampleRate: 1000
amplitudes:
- 5
- 5
- 5
frequencies:
- 1
- 60
- 100
- id: 64cb7ca2-ebca-4502-8fc4-208fa90545f3
type: EXECUTABLE_CONSUMER
executable: node /usr/share/engines/worker-scripts/WebSocket.ts
config:
port: 8088

pipes:
- id: 67a7a083-28b0-4dd5-b33d-e60e339bb93a
source: 64cb7ca2-ebca-4502-8fc4-208fa90545f1
destination: 64cb7ca2-ebca-4502-8fc4-208fa90545f3

Response

{
"id": "e795dc27-8da2-42e8-8d3b-50e5e4ff9943",
"success": true
}

Start Stream

Starts execution of a stream that has been created.

Endpoint

POST /stream/streams/{id}/signal

Path Parameters

ParameterTypeDescription
idStringUnique identifier of the stream to start

Request Body

signal=start

Response

{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}

Stop Stream

Stops execution of a running stream.

Endpoint

POST /stream/streams/{id}/signal

Path Parameters

ParameterTypeDescription
idStringUnique identifier of the stream to stop

Request Body

signal=stop

Response

{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}

Reconcile Stream

Restarts a stream (stops and then starts it).

Endpoint

POST /stream/streams/{id}/signal

Path Parameters

ParameterTypeDescription
idStringUnique identifier of the stream to reconcile

Request Body

signal=reconcile

Response

{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}

Send Signal to Node

Sends a signal to a specific node within a stream.

Endpoint

POST /stream/streams/{id}/nodes/{nodeId}/signal

Path Parameters

ParameterTypeDescription
idStringUnique identifier of the stream
nodeIdStringUnique identifier of the node

Request Body

signal=start

Response

{
"id": "64cb7ca2-ebca-4502-8fc4-208fa90545f1",
"streamId": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"signal": "start",
"success": true
}