Skip to main content

Instinct Python SDK Reference

Overview

The Instinct Python SDK (instinct-py) provides a comprehensive set of tools for discovering, connecting to, and controlling Nexstem Instinct headsets from Python applications. This SDK enables developers to:

  • Discover Instinct headsets on the local network
  • Monitor headset state (battery, CPU, RAM, storage)
  • Configure electrodes and sensors
  • Create and manage data streams
  • Build custom signal processing pipelines
  • Store and retrieve device configuration values

Installation

# Using pip
pip install instinct-py

# Using poetry
poetry add instinct-py

Requirements

  • Python 3.9 or later
  • An Instinct headset on the same network

Getting Started

Importing the SDK

from instinct_py import Headset

Discovering Headsets

The SDK provides methods to discover Instinct headsets on your local network:

from instinct_py import Headset

def discover_headsets():
try:
# Discover all Instinct headsets on the network
headsets = Headset.discover()
print(f"Found {len(headsets)} headsets")

if len(headsets) > 0:
# Connect to the first headset found
headset = headsets[0]
print(f"Connected to headset: {headset.get_name()}")
return headset
else:
print("No headsets found")
return None
except Exception as error:
print(f"Error discovering headsets: {error}")
return None

Connecting to a Known Headset

If you know the IP address of your headset, you can connect directly:

from instinct_py import Headset

# Connect to a headset at a specific IP address
headset = Headset("192.168.1.100")

# Enable debug mode for additional logging
headset_with_debug = Headset("192.168.1.100", debug=True)

API Reference

Headset Class

The Headset class is the main entry point for interacting with Instinct devices.

Static Methods

MethodDescriptionParametersReturns
discover(timeout=3000, discovery_port=48010, debug=False)Discovers Instinct headsets on the networktimeout: int, discovery_port: int, debug: boolList[Headset]

Instance Properties

PropertyTypeDescription
streams_managerHeadsetStreamsManagerManager for creating and controlling data streams
electrode_managerHeadsetElectrodesManagerManager for electrode configurations
sensor_managerHeadsetSensorsManagerManager for sensor data
device_config_managerDeviceConfigManagerManager for device configuration storage
host_addressstrIP address of the headset

Instance Methods

MethodDescriptionParametersReturns
get_state()Gets the current state of the headsetNoneHeadsetState
get_name()Gets the current name of the headsetNonestr
set_name(name)Sets a new name for the headsetname: strNone
send_debug_command(command)Sends a debug command to the headset (debug mode only)command: strAny

HeadsetState Interface

class HeadsetState:
status: str # 'connected', 'disconnected', or 'error'
battery: BatteryInfo
cpu: CpuInfo
memory: MemoryInfo
storage: StorageInfo

class BatteryInfo:
percent: float
charging: bool

class CpuInfo:
load: float
temperature: float

class MemoryInfo:
used: int
total: int

class StorageInfo:
used: int
total: int

Stream Management

The SDK provides classes for creating and managing data processing streams:

HeadsetStreamsManager Class

Manages the lifecycle of data processing streams.

MethodDescriptionParametersReturns
create_stream(config)Creates a new stream with the provided configurationconfig: dictStream
get_stream(id)Gets a stream by IDid: strStream
list_streams()Lists all streamsNoneList[Stream]
delete_stream(id)Deletes a streamid: strNone

Stream Class

Represents a data processing stream with nodes and pipes.

MethodDescriptionParametersReturns
create()Creates the stream on the headsetNoneNone
start()Starts data processingNoneNone
stop()Stops data processingNoneNone
send_signal(signal, parameters=None)Sends a signal to the streamsignal: str, parameters: dictNone
add_nodes(nodes)Adds new nodes to the streamnodes: List[dict]None
delete_node(node_id)Deletes a node from the streamnode_id: strNone
add_pipes(pipes)Adds new pipes to the streampipes: List[dict]None
delete_pipe(pipe_id)Deletes a pipe from the streampipe_id: strNone
reconcile()Reconciles the stream configurationNoneNone
get_pipes()Gets all pipes in the streamNoneList[Pipe]

Node Class

Represents a processing node in a stream.

MethodDescriptionParametersReturns
create()Creates the node on the headsetNoneNone
send_signal(signal, parameters)Sends a signal to the nodesignal: str, parameters: dictNone
delete()Deletes the node from the streamNoneNone

Pipe Class

Represents a connection between nodes.

MethodDescriptionParametersReturns
create()Creates the pipe on the headsetNoneNone
delete()Deletes the pipe from the streamNoneNone

Electrode Management

The SDK provides classes for controlling and configuring headset electrodes:

HeadsetElectrodesManager Class

Manages the electrodes on the headset.

MethodDescriptionParametersReturns
get_electrodes()Gets information about all electrodesNoneList[ElectrodeInfo]
get_electrode(id)Gets information about a specific electrodeid: strElectrodeInfo
set_electrode_config(id, config)Configures a specific electrodeid: str, config: ElectrodeConfigNone
check_impedance(electrode_ids=None)Checks impedance for specified electrodeselectrode_ids: List[str]List[ImpedanceResult]

Device Configuration Management

The SDK provides DeviceConfigManager for storing and retrieving persistent configuration values on the headset.

DeviceConfigManager Class

Manages device configuration storage.

MethodDescriptionParametersReturns
create_config(config_data)Creates a new configuration entryconfig_data: dictstr
get_config(key)Retrieves a configuration by keykey: strdict
update_config(key, config_data)Updates an existing configurationkey: str, config_data: dictNone
delete_config(key)Deletes a configuration entrykey: strNone

Configuration Data Types

class ConfigData:
key: str # Unique identifier for the configuration
value: Any # Value to store (serialized to string)
expires_in: str # Optional time-to-live (e.g., "1h", "2d", "30m")
id: str # Optional unique ID (auto-generated if not provided)

Tutorials

Basic Stream Creation

This tutorial demonstrates how to create a simple EEG stream:

from instinct_py import Headset
import uuid
import time

async def create_basic_stream():
try:
# Discover and connect to headset
headsets = Headset.discover()
if len(headsets) == 0:
print("No headsets found.")
return

headset = headsets[0]
print(f"Connected to headset: {headset.get_name()}")

# Generate UUIDs for nodes
source_id = str(uuid.uuid4())
ssvep_id = str(uuid.uuid4())

# Create a stream
stream = headset.streams_manager.create_stream({
"id": str(uuid.uuid4()),
"nodes": [
{
"executable": "eeg_source",
"config": {
"sampleRate": 1000,
"gain": 1,
},
"id": source_id,
},
{
"executable": "ssvep_algo",
"config": {},
"id": ssvep_id,
},
],
"pipes": [
{
"source": source_id,
"destination": ssvep_id,
"id": str(uuid.uuid4()),
},
],
})

# Create the stream on the headset
await stream.create()
print("Stream created successfully")

# Start the stream
await stream.start()
print("Stream started successfully")

# Let it run for a while (10 seconds)
time.sleep(10)

# Stop the stream
await stream.stop()
print("Stream stopped successfully")

except Exception as error:
print(f"Error: {error}")

# Run the async function
import asyncio
asyncio.run(create_basic_stream())

Alpha Rhythm Analysis Stream

This tutorial demonstrates how to create a more complex stream for analyzing alpha rhythms:

from instinct_py import Headset
import uuid
import asyncio

async def create_alpha_rhythm_stream():
try:
# Connect to a headset at a specific IP address
headset = Headset("192.168.1.100")

# Generate UUIDs for nodes
source_id = str(uuid.uuid4())
bandpass_id = str(uuid.uuid4())
fft_id = str(uuid.uuid4())
websocket_id = str(uuid.uuid4())

# Create a stream with custom metadata
stream = headset.streams_manager.create_stream({
"id": str(uuid.uuid4()),
"meta": {
"name": "Alpha Rhythm Analysis",
"description": "Extracts and analyzes alpha rhythms from occipital electrodes",
"version": "1.0.0",
},
"nodes": [
# EEG data source
{
"executable": "eeg_source",
"config": {
"sampleRate": 250,
"channels": ["O1", "O2", "PZ"],
},
"id": source_id,
},
# Bandpass filter for alpha band (8-12 Hz)
{
"executable": "bandpass_filter",
"config": {
"cutoff": 10,
"bandwidth": 4,
"order": 4,
},
"id": bandpass_id,
},
# Spectral analysis node
{
"executable": "fft_analyzer",
"config": {
"windowSize": 512,
"overlap": 0.5,
},
"id": fft_id,
},
# Data output node
{
"executable": "websocket_output",
"config": {
"port": 36006,
},
"id": websocket_id,
},
],
"pipes": [
# Connect the nodes in sequence
{
"id": str(uuid.uuid4()),
"source": source_id,
"destination": bandpass_id,
},
{
"id": str(uuid.uuid4()),
"source": bandpass_id,
"destination": fft_id,
},
{
"id": str(uuid.uuid4()),
"source": fft_id,
"destination": websocket_id,
},
],
})

# Create and start the stream
await stream.create()
print("Alpha rhythm analysis stream created")

await stream.start()
print("Alpha rhythm analysis stream started")

print("Stream is running. Connect to WebSocket at ws://localhost:36006 to view data")
print("Press Ctrl+C to stop the stream")

# Keep the stream running until manually stopped
try:
# Wait indefinitely
await asyncio.Future()
except KeyboardInterrupt:
# Stop the stream when Ctrl+C is pressed
await stream.stop()
print("Stream stopped")

except Exception as error:
print(f"Error: {error}")

# Run the async function
asyncio.run(create_alpha_rhythm_stream())

Extending a Stream with Custom Nodes

This tutorial shows how to add custom nodes to an existing stream:

from instinct_py import Headset
import uuid
import asyncio

async def extend_stream_with_custom_node(stream_id, fft_node_id, websocket_node_id):
try:
# Connect to headset
headset = Headset("192.168.1.100")

# Get existing stream
stream = await headset.streams_manager.get_stream(stream_id)

# First, stop the running stream
await stream.stop()
print("Stream stopped for modification")

# Add the new custom node
custom_node_id = str(uuid.uuid4())
await stream.add_nodes([
{
"executable": "custom_feature_extractor",
"config": {
"threshold": 0.75,
"windowSize": 256,
},
"meta": {
"author": "Nexstem",
"version": "1.1.0",
},
"id": custom_node_id,
},
])
print("Custom node added")

# Add new pipes to connect the custom node
new_pipes = [
{
"id": str(uuid.uuid4()),
"source": fft_node_id,
"destination": custom_node_id,
},
{
"id": str(uuid.uuid4()),
"source": custom_node_id,
"destination": websocket_node_id,
},
]

await stream.add_pipes(new_pipes)
print("New pipes added")

# Find and delete the pipe which is no longer relevant
# (This would be the direct connection from FFT to WebSocket)
pipes = stream.get_pipes()
pipe_to_delete = next(
(pipe for pipe in pipes if pipe.source == fft_node_id and pipe.destination == websocket_node_id),
None
)

if pipe_to_delete:
await stream.delete_pipes([pipe_to_delete.id])
print("Old pipe deleted")

# Reconcile the stream configuration to adapt to the new configuration
await stream.reconcile()
print("Stream reconciled")

# Start the stream again
await stream.start()
print("Stream restarted with custom node")

except Exception as error:
print(f"Error: {error}")

Managing Device Configuration

This tutorial shows how to store and retrieve configuration values on the headset:

from instinct_py import Headset
import asyncio

async def manage_device_config():
try:
# Connect to headset
headset = Headset("192.168.1.100")

# Store a configuration value
config_key = "user_preferences"
config_value = {
"channels": ["Fp1", "Fp2", "F7", "F8"],
"sampleRate": 500,
"notchFilter": True,
"theme": "dark",
}

# Create a configuration that expires in 30 days
config_id = await headset.device_config_manager.create_config({
"key": config_key,
"value": config_value,
"expires_in": "30d",
})

print(f"Configuration created with ID: {config_id}")

# Retrieve the configuration
retrieved_config = await headset.device_config_manager.get_config(config_key)
print("Retrieved configuration:", retrieved_config)

# Update the configuration
config_value["theme"] = "light"
await headset.device_config_manager.update_config(config_key, {
"value": config_value,
"expires_in": "60d", # Extended expiration
})

print("Configuration updated")

# Retrieve the updated configuration
updated_config = await headset.device_config_manager.get_config(config_key)
print("Updated configuration:", updated_config)

except Exception as error:
print(f"Error: {error}")

# Run the async function
asyncio.run(manage_device_config())

Checking Electrode Impedance

This tutorial demonstrates how to check and optimize electrode impedance:

from instinct_py import Headset
import asyncio
import time

async def check_electrode_impedance():
try:
# Connect to headset
headset = Headset("192.168.1.100")

# Get all electrodes
electrodes = await headset.electrode_manager.get_electrodes()
print(f"Found {len(electrodes)} electrodes")

# Select specific electrodes to check
electrode_ids = ["Fp1", "Fp2", "O1", "O2"]

# Check impedance for those electrodes
impedance_results = await headset.electrode_manager.check_impedance(electrode_ids)

# Display results
print("\nInitial Impedance Results:")
for result in impedance_results:
status = "Good" if result.value < 10.0 else "High"
print(f"Electrode {result.id}: {result.value} kΩ - {status}")

# Give user time to adjust electrodes with high impedance
print("\nPlease adjust any electrodes with high impedance...")
time.sleep(10)

# Check again
impedance_results = await headset.electrode_manager.check_impedance(electrode_ids)

# Display updated results
print("\nUpdated Impedance Results:")
for result in impedance_results:
status = "Good" if result.value < 10.0 else "High"
print(f"Electrode {result.id}: {result.value} kΩ - {status}")

except Exception as error:
print(f"Error: {error}")

# Run the async function
asyncio.run(check_electrode_impedance())

Best Practices

Error Handling

Always implement proper error handling when working with the SDK:

try:
headsets = Headset.discover()
if len(headsets) == 0:
print("No headsets found.")
return

headset = headsets[0]
await headset.set_name("My Headset")
except Exception as error:
print(f"Error: {error}")

# Handle specific error types
if isinstance(error, ConnectionError):
print("Connection to headset was lost. Attempting to reconnect...")
# Implement reconnection logic
elif isinstance(error, ValueError):
print("Invalid configuration provided:", error)
# Fix configuration issues

Resource Management

Properly manage streams to conserve device resources:

# Using context manager (if supported)
async with stream:
# Stream is automatically started and stopped
await perform_operations()

# Or manual cleanup
try:
await stream.start()
# Use the stream...
finally:
# Ensure the stream is stopped even if an error occurs
if stream:
await stream.stop()

Connection Optimization

For optimal performance, implement the following practices:

  1. Use debug mode only when necessary

    # Enable only during development
    headset = Headset("192.168.1.100", debug=True)
  2. Optimize stream configurations

    # Use appropriate sample rates
    {
    "executable": "eeg_source",
    "config": {
    # Only use the sample rate you actually need
    "sampleRate": 250, # Instead of 1000 if 250Hz is sufficient
    # Only include channels you need to process
    "channels": ["O1", "O2"],
    }
    }
  3. Implement connection monitoring

    import asyncio

    async def monitor_connection(headset):
    while True:
    try:
    state = await headset.get_state()
    if state.status != "connected":
    print("Headset disconnected, attempting to reconnect...")
    # Implement reconnection logic
    except Exception as error:
    print(f"Connection monitoring error: {error}")

    # Check every 5 seconds
    await asyncio.sleep(5)

    # Start monitoring in the background
    asyncio.create_task(monitor_connection(headset))

Troubleshooting

Common Issues

Cannot Discover Headsets

# Problem: Headset.discover() returns an empty list

# Solutions:
# 1. Ensure the headset is powered on and connected to the same network
# 2. Check firewall settings that might block UDP broadcasts (port 48010)
# 3. Try specifying the IP address directly:
headset = Headset("192.168.1.100")
tip

If you know the headset's IP address, connecting directly is more reliable than discovery.

Stream Creation Fails

# Problem: stream.create() raises an exception

# Solutions:
# 1. Verify all UUIDs are valid
import uuid
if not is_valid_uuid(node_id): # Custom validation function
print(f"Invalid UUID: {node_id}")

# 2. Check that node executables exist on the headset
# Only use executables that are documented or that you've confirmed exist

# 3. Ensure pipe connections reference valid node IDs
# Make sure source and destination IDs match existing nodes

Configuration Storage Fails

# Problem: device_config_manager.create_config() raises an exception

# Solutions:
# 1. Check that the key is a valid string
# 2. Ensure the value can be properly serialized
# 3. Verify that the expiration format is correct
valid_expiration_formats = ["30s", "5m", "2h", "1d", "1w", "1M"]

Connection Timeouts

# Problem: API calls time out or take too long

# Solutions:
# 1. The headset may be overloaded; try simplifying your stream
# 2. Check network stability and latency
# 3. Increase timeout values in API calls
headset = Headset("192.168.1.100", timeout=10000) # 10 seconds instead of default

Diagnostic Techniques

Enable Debug Mode

# Turn on detailed logging
headset = Headset("192.168.1.100", debug=True)

Check Headset State

# Get comprehensive information about the headset
state = headset.get_state()
print(state)

# Check specific metrics
if state.cpu.load > 80:
print("CPU load is high, consider simplifying your stream")

if state.battery.percent < 20 and not state.battery.charging:
print("Battery is low, connect the headset to power")

Inspect Streams

# List all running streams to debug potential conflicts
streams = headset.streams_manager.list_streams()
print(f"Running streams: {len(streams)}")
for stream in streams:
print(f"- Stream {stream.id}: {stream.meta.get('name', 'unnamed')}")

Advanced Topics

Custom Networking Configuration

For environments with complex networking requirements:

headset = Headset(
"192.168.1.100",
port=48011, # Custom port
timeout=15000, # 15 second timeout
retries=3, # Number of retry attempts
retry_delay=1000, # Delay between retries (ms)
)

WebSocket Data Consumption

To work with data from a WebSocket output node:

import websockets
import asyncio
import json

async def connect_to_data_stream(url):
async with websockets.connect(url) as websocket:
print("Connected to data stream")

try:
while True:
data = await websocket.recv()
# Parse the incoming data
parsed_data = parse_eeg_data(data)
print("Received data:", parsed_data)

# Process or visualize the data
process_data(parsed_data)
except websockets.exceptions.ConnectionClosed:
print("Disconnected from data stream")

def parse_eeg_data(data):
# Example parsing function - adjust based on your data format
if isinstance(data, bytes):
# Handle binary data
return {"type": "binary", "size": len(data)}
else:
# Handle text data (likely JSON)
return json.loads(data)

def process_data(data):
# Process the EEG data (example)
print(f"Processing data point with timestamp: {data.get('timestamp')}")

# Connect to the WebSocket server running on the headset
asyncio.run(connect_to_data_stream("ws://192.168.1.100:36006"))

Multi-Headset Management

For applications that need to work with multiple headsets:

from instinct_py import Headset
import asyncio

class HeadsetManager:
def __init__(self):
self.headsets = {}

async def discover_and_connect(self):
devices = Headset.discover()
print(f"Found {len(devices)} headsets")

for device in devices:
try:
name = await device.get_name()
state = await device.get_state()

self.headsets[name] = {
"device": device,
"state": state,
"active_streams": []
}

print(f"Connected to headset: {name}")
except Exception as error:
print(f"Failed to connect to headset: {error}")

return len(self.headsets)

def get_headset(self, name):
return self.headsets.get(name, {}).get("device")

async def start_stream_on_all(self, stream_config):
results = []

for name, headset_info in self.headsets.items():
try:
stream = headset_info["device"].streams_manager.create_stream(stream_config)
await stream.create()
await stream.start()

headset_info["active_streams"].append(stream)
results.append({"name": name, "success": True, "stream": stream})

print(f"Started stream on headset: {name}")
except Exception as error:
results.append({"name": name, "success": False, "error": str(error)})
print(f"Failed to start stream on headset {name}: {error}")

return results

async def stop_all_streams(self):
for name, headset_info in self.headsets.items():
try:
for stream in headset_info["active_streams"]:
await stream.stop()
headset_info["active_streams"] = []
print(f"Stopped all streams on headset: {name}")
except Exception as error:
print(f"Error stopping streams on headset {name}: {error}")

# Example usage:
async def main():
manager = HeadsetManager()
count = await manager.discover_and_connect()
print(f"Connected to {count} headsets")

# Create a basic stream configuration
stream_config = {
"id": str(uuid.uuid4()),
# ... additional stream configuration
}

# Start the stream on all headsets
results = await manager.start_stream_on_all(stream_config)
print(f"Streams started on {sum(1 for r in results if r['success'])} of {len(results)} headsets")

# Let streams run for 30 seconds
await asyncio.sleep(30)

# Stop all streams
await manager.stop_all_streams()

asyncio.run(main())

This documentation provides a comprehensive overview of the Instinct Python SDK. For further assistance or contributions to this documentation, please contact the Nexstem development team.