Stream Operations API
The Streams Engine provides several endpoints for creating and managing streams. This page documents the request and response formats for these operations.
Create Stream
Creates a new stream processing pipeline with specified nodes and pipes.
Endpoint
POST /stream/streams
Request Body
{
"id": "cdc566b5-0221-4018-a99f-23265af1e04d",
"meta": {
"name": "Streams - Realtime stream - cdc566b5-0221-4018-a99f-23265af1e04d"
},
"nodes": [
{
"id": "94f92dc4-5a84-45c7-8543-9c18ba4df95c",
"type": "EXECUTABLE",
"meta": {
"type": "SOURCE"
},
"executable": "node /usr/share/engines/worker-scripts/SignalGenerator.js",
"config": {
"sampleRate": 1000,
"amplitudes": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"frequencies": [2, 3, 5, 6, 11, 23, 30, 50, 75]
}
},
{
"id": "a297523e-ab4b-41d9-b29c-c434eb4c9723",
"type": "EXECUTABLE",
"meta": {
"type": "OUTPUT"
},
"executable": "node /usr/share/engines/worker-scripts/WebSocket.js",
"config": {
"port": 36006
}
}
],
"pipes": [
{
"id": "50198da6-2426-4276-9ba4-d30f396f3d54",
"source": "94f92dc4-5a84-45c7-8543-9c18ba4df95c",
"destination": "a297523e-ab4b-41d9-b29c-c434eb4c9723"
}
]
}
Response
{
"id": "cdc566b5-0221-4018-a99f-23265af1e04d",
"success": true
}
Create Stream from YAML
Creates a new stream processing pipeline from a YAML definition.
Endpoint
POST /stream/streams/yaml
Request Body
The request body should be a YAML string with a structure similar to the JSON format above:
---
id: e795dc27-8da2-42e8-8d3b-50e5e4ff9943
meta:
name: Testing creation of a stream from API
nodes:
- id: 64cb7ca2-ebca-4502-8fc4-208fa90545f1
type: EXECUTABLE_PRODUCER
executable: node /usr/share/engines/worker-scripts/SignalGenerator.ts
config:
sampleRate: 1000
amplitudes:
- 5
- 5
- 5
frequencies:
- 1
- 60
- 100
- id: 64cb7ca2-ebca-4502-8fc4-208fa90545f3
type: EXECUTABLE_CONSUMER
executable: node /usr/share/engines/worker-scripts/WebSocket.ts
config:
port: 8088
pipes:
- id: 67a7a083-28b0-4dd5-b33d-e60e339bb93a
source: 64cb7ca2-ebca-4502-8fc4-208fa90545f1
destination: 64cb7ca2-ebca-4502-8fc4-208fa90545f3
Response
{
"id": "e795dc27-8da2-42e8-8d3b-50e5e4ff9943",
"success": true
}
Start Stream
Starts execution of a stream that has been created.
Endpoint
POST /stream/streams/{id}/signal
Path Parameters
Parameter | Type | Description |
---|---|---|
id | String | Unique identifier of the stream to start |
Request Body
signal=start
Response
{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}
Stop Stream
Stops execution of a running stream.
Endpoint
POST /stream/streams/{id}/signal
Path Parameters
Parameter | Type | Description |
---|---|---|
id | String | Unique identifier of the stream to stop |
Request Body
signal=stop
Response
{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}
Reconcile Stream
Restarts a stream (stops and then starts it).
Endpoint
POST /stream/streams/{id}/signal
Path Parameters
Parameter | Type | Description |
---|---|---|
id | String | Unique identifier of the stream to reconcile |
Request Body
signal=reconcile
Response
{
"id": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"success": true
}
Send Signal to Node
Sends a signal to a specific node within a stream.
Endpoint
POST /stream/streams/{id}/nodes/{nodeId}/signal
Path Parameters
Parameter | Type | Description |
---|---|---|
id | String | Unique identifier of the stream |
nodeId | String | Unique identifier of the node |
Request Body
signal=start
Response
{
"id": "64cb7ca2-ebca-4502-8fc4-208fa90545f1",
"streamId": "425ce7ec-a7af-497a-8d67-6371a32ad8bb",
"signal": "start",
"success": true
}