Instinct Python SDK Reference
Overview
The Instinct Python SDK (instinct-py
) provides a comprehensive set of tools for discovering, connecting to, and controlling Nexstem Instinct headsets from Python applications. This SDK enables developers to:
- Discover Instinct headsets on the local network
- Monitor headset state (battery, CPU, RAM, storage)
- Configure electrodes and sensors
- Create and manage data streams
- Build custom signal processing pipelines
- Store and retrieve device configuration values
Installation
# Using pip
pip install instinct-py
# Using poetry
poetry add instinct-py
Requirements
- Python 3.9 or later
- An Instinct headset on the same network
Getting Started
Importing the SDK
from instinct_py import Headset
Discovering Headsets
The SDK provides methods to discover Instinct headsets on your local network:
from instinct_py import Headset
def discover_headsets():
try:
# Discover all Instinct headsets on the network
headsets = Headset.discover()
print(f"Found {len(headsets)} headsets")
if len(headsets) > 0:
# Connect to the first headset found
headset = headsets[0]
print(f"Connected to headset: {headset.get_name()}")
return headset
else:
print("No headsets found")
return None
except Exception as error:
print(f"Error discovering headsets: {error}")
return None
Connecting to a Known Headset
If you know the IP address of your headset, you can connect directly:
from instinct_py import Headset
# Connect to a headset at a specific IP address
headset = Headset("192.168.1.100")
# Enable debug mode for additional logging
headset_with_debug = Headset("192.168.1.100", debug=True)
API Reference
Headset Class
The Headset
class is the main entry point for interacting with Instinct devices.
Static Methods
Method | Description | Parameters | Returns |
---|---|---|---|
discover(timeout=3000, discovery_port=48010, debug=False) | Discovers Instinct headsets on the network | timeout: int, discovery_port: int, debug: bool | List[Headset] |
Instance Properties
Property | Type | Description |
---|---|---|
streams_manager | HeadsetStreamsManager | Manager for creating and controlling data streams |
electrode_manager | HeadsetElectrodesManager | Manager for electrode configurations |
sensor_manager | HeadsetSensorsManager | Manager for sensor data |
device_config_manager | DeviceConfigManager | Manager for device configuration storage |
host_address | str | IP address of the headset |
Instance Methods
Method | Description | Parameters | Returns |
---|---|---|---|
get_state() | Gets the current state of the headset | None | HeadsetState |
get_name() | Gets the current name of the headset | None | str |
set_name(name) | Sets a new name for the headset | name: str | None |
send_debug_command(command) | Sends a debug command to the headset (debug mode only) | command: str | Any |
HeadsetState Interface
class HeadsetState:
status: str # 'connected', 'disconnected', or 'error'
battery: BatteryInfo
cpu: CpuInfo
memory: MemoryInfo
storage: StorageInfo
class BatteryInfo:
percent: float
charging: bool
class CpuInfo:
load: float
temperature: float
class MemoryInfo:
used: int
total: int
class StorageInfo:
used: int
total: int
Stream Management
The SDK provides classes for creating and managing data processing streams:
HeadsetStreamsManager Class
Manages the lifecycle of data processing streams.
Method | Description | Parameters | Returns |
---|---|---|---|
create_stream(config) | Creates a new stream with the provided configuration | config: dict | Stream |
get_stream(id) | Gets a stream by ID | id: str | Stream |
list_streams() | Lists all streams | None | List[Stream] |
delete_stream(id) | Deletes a stream | id: str | None |
Stream Class
Represents a data processing stream with nodes and pipes.
Method | Description | Parameters | Returns |
---|---|---|---|
create() | Creates the stream on the headset | None | None |
start() | Starts data processing | None | None |
stop() | Stops data processing | None | None |
send_signal(signal, parameters=None) | Sends a signal to the stream | signal: str, parameters: dict | None |
add_nodes(nodes) | Adds new nodes to the stream | nodes: List[dict] | None |
delete_node(node_id) | Deletes a node from the stream | node_id: str | None |
add_pipes(pipes) | Adds new pipes to the stream | pipes: List[dict] | None |
delete_pipe(pipe_id) | Deletes a pipe from the stream | pipe_id: str | None |
reconcile() | Reconciles the stream configuration | None | None |
get_pipes() | Gets all pipes in the stream | None | List[Pipe] |
Node Class
Represents a processing node in a stream.
Method | Description | Parameters | Returns |
---|---|---|---|
create() | Creates the node on the headset | None | None |
send_signal(signal, parameters) | Sends a signal to the node | signal: str, parameters: dict | None |
delete() | Deletes the node from the stream | None | None |
Pipe Class
Represents a connection between nodes.
Method | Description | Parameters | Returns |
---|---|---|---|
create() | Creates the pipe on the headset | None | None |
delete() | Deletes the pipe from the stream | None | None |
Electrode Management
The SDK provides classes for controlling and configuring headset electrodes:
HeadsetElectrodesManager Class
Manages the electrodes on the headset.
Method | Description | Parameters | Returns |
---|---|---|---|
get_electrodes() | Gets information about all electrodes | None | List[ElectrodeInfo] |
get_electrode(id) | Gets information about a specific electrode | id: str | ElectrodeInfo |
set_electrode_config(id, config) | Configures a specific electrode | id: str, config: ElectrodeConfig | None |
check_impedance(electrode_ids=None) | Checks impedance for specified electrodes | electrode_ids: List[str] | List[ImpedanceResult] |
Device Configuration Management
The SDK provides DeviceConfigManager for storing and retrieving persistent configuration values on the headset.
DeviceConfigManager Class
Manages device configuration storage.
Method | Description | Parameters | Returns |
---|---|---|---|
create_config(config_data) | Creates a new configuration entry | config_data: dict | str |
get_config(key) | Retrieves a configuration by key | key: str | dict |
update_config(key, config_data) | Updates an existing configuration | key: str, config_data: dict | None |
delete_config(key) | Deletes a configuration entry | key: str | None |
Configuration Data Types
class ConfigData:
key: str # Unique identifier for the configuration
value: Any # Value to store (serialized to string)
expires_in: str # Optional time-to-live (e.g., "1h", "2d", "30m")
id: str # Optional unique ID (auto-generated if not provided)
Tutorials
Basic Stream Creation
This tutorial demonstrates how to create a simple EEG stream:
from instinct_py import Headset
import uuid
import time
async def create_basic_stream():
try:
# Discover and connect to headset
headsets = Headset.discover()
if len(headsets) == 0:
print("No headsets found.")
return
headset = headsets[0]
print(f"Connected to headset: {headset.get_name()}")
# Generate UUIDs for nodes
source_id = str(uuid.uuid4())
ssvep_id = str(uuid.uuid4())
# Create a stream
stream = headset.streams_manager.create_stream({
"id": str(uuid.uuid4()),
"nodes": [
{
"executable": "eeg_source",
"config": {
"sampleRate": 1000,
"gain": 1,
},
"id": source_id,
},
{
"executable": "ssvep_algo",
"config": {},
"id": ssvep_id,
},
],
"pipes": [
{
"source": source_id,
"destination": ssvep_id,
"id": str(uuid.uuid4()),
},
],
})
# Create the stream on the headset
await stream.create()
print("Stream created successfully")
# Start the stream
await stream.start()
print("Stream started successfully")
# Let it run for a while (10 seconds)
time.sleep(10)
# Stop the stream
await stream.stop()
print("Stream stopped successfully")
except Exception as error:
print(f"Error: {error}")
# Run the async function
import asyncio
asyncio.run(create_basic_stream())
Alpha Rhythm Analysis Stream
This tutorial demonstrates how to create a more complex stream for analyzing alpha rhythms:
from instinct_py import Headset
import uuid
import asyncio
async def create_alpha_rhythm_stream():
try:
# Connect to a headset at a specific IP address
headset = Headset("192.168.1.100")
# Generate UUIDs for nodes
source_id = str(uuid.uuid4())
bandpass_id = str(uuid.uuid4())
fft_id = str(uuid.uuid4())
websocket_id = str(uuid.uuid4())
# Create a stream with custom metadata
stream = headset.streams_manager.create_stream({
"id": str(uuid.uuid4()),
"meta": {
"name": "Alpha Rhythm Analysis",
"description": "Extracts and analyzes alpha rhythms from occipital electrodes",
"version": "1.0.0",
},
"nodes": [
# EEG data source
{
"executable": "eeg_source",
"config": {
"sampleRate": 250,
"channels": ["O1", "O2", "PZ"],
},
"id": source_id,
},
# Bandpass filter for alpha band (8-12 Hz)
{
"executable": "bandpass_filter",
"config": {
"cutoff": 10,
"bandwidth": 4,
"order": 4,
},
"id": bandpass_id,
},
# Spectral analysis node
{
"executable": "fft_analyzer",
"config": {
"windowSize": 512,
"overlap": 0.5,
},
"id": fft_id,
},
# Data output node
{
"executable": "websocket_output",
"config": {
"port": 36006,
},
"id": websocket_id,
},
],
"pipes": [
# Connect the nodes in sequence
{
"id": str(uuid.uuid4()),
"source": source_id,
"destination": bandpass_id,
},
{
"id": str(uuid.uuid4()),
"source": bandpass_id,
"destination": fft_id,
},
{
"id": str(uuid.uuid4()),
"source": fft_id,
"destination": websocket_id,
},
],
})
# Create and start the stream
await stream.create()
print("Alpha rhythm analysis stream created")
await stream.start()
print("Alpha rhythm analysis stream started")
print("Stream is running. Connect to WebSocket at ws://localhost:36006 to view data")
print("Press Ctrl+C to stop the stream")
# Keep the stream running until manually stopped
try:
# Wait indefinitely
await asyncio.Future()
except KeyboardInterrupt:
# Stop the stream when Ctrl+C is pressed
await stream.stop()
print("Stream stopped")
except Exception as error:
print(f"Error: {error}")
# Run the async function
asyncio.run(create_alpha_rhythm_stream())
Extending a Stream with Custom Nodes
This tutorial shows how to add custom nodes to an existing stream:
from instinct_py import Headset
import uuid
import asyncio
async def extend_stream_with_custom_node(stream_id, fft_node_id, websocket_node_id):
try:
# Connect to headset
headset = Headset("192.168.1.100")
# Get existing stream
stream = await headset.streams_manager.get_stream(stream_id)
# First, stop the running stream
await stream.stop()
print("Stream stopped for modification")
# Add the new custom node
custom_node_id = str(uuid.uuid4())
await stream.add_nodes([
{
"executable": "custom_feature_extractor",
"config": {
"threshold": 0.75,
"windowSize": 256,
},
"meta": {
"author": "Nexstem",
"version": "1.1.0",
},
"id": custom_node_id,
},
])
print("Custom node added")
# Add new pipes to connect the custom node
new_pipes = [
{
"id": str(uuid.uuid4()),
"source": fft_node_id,
"destination": custom_node_id,
},
{
"id": str(uuid.uuid4()),
"source": custom_node_id,
"destination": websocket_node_id,
},
]
await stream.add_pipes(new_pipes)
print("New pipes added")
# Find and delete the pipe which is no longer relevant
# (This would be the direct connection from FFT to WebSocket)
pipes = stream.get_pipes()
pipe_to_delete = next(
(pipe for pipe in pipes if pipe.source == fft_node_id and pipe.destination == websocket_node_id),
None
)
if pipe_to_delete:
await stream.delete_pipes([pipe_to_delete.id])
print("Old pipe deleted")
# Reconcile the stream configuration to adapt to the new configuration
await stream.reconcile()
print("Stream reconciled")
# Start the stream again
await stream.start()
print("Stream restarted with custom node")
except Exception as error:
print(f"Error: {error}")
Managing Device Configuration
This tutorial shows how to store and retrieve configuration values on the headset:
from instinct_py import Headset
import asyncio
async def manage_device_config():
try:
# Connect to headset
headset = Headset("192.168.1.100")
# Store a configuration value
config_key = "user_preferences"
config_value = {
"channels": ["Fp1", "Fp2", "F7", "F8"],
"sampleRate": 500,
"notchFilter": True,
"theme": "dark",
}
# Create a configuration that expires in 30 days
config_id = await headset.device_config_manager.create_config({
"key": config_key,
"value": config_value,
"expires_in": "30d",
})
print(f"Configuration created with ID: {config_id}")
# Retrieve the configuration
retrieved_config = await headset.device_config_manager.get_config(config_key)
print("Retrieved configuration:", retrieved_config)
# Update the configuration
config_value["theme"] = "light"
await headset.device_config_manager.update_config(config_key, {
"value": config_value,
"expires_in": "60d", # Extended expiration
})
print("Configuration updated")
# Retrieve the updated configuration
updated_config = await headset.device_config_manager.get_config(config_key)
print("Updated configuration:", updated_config)
except Exception as error:
print(f"Error: {error}")
# Run the async function
asyncio.run(manage_device_config())
Checking Electrode Impedance
This tutorial demonstrates how to check and optimize electrode impedance:
from instinct_py import Headset
import asyncio
import time
async def check_electrode_impedance():
try:
# Connect to headset
headset = Headset("192.168.1.100")
# Get all electrodes
electrodes = await headset.electrode_manager.get_electrodes()
print(f"Found {len(electrodes)} electrodes")
# Select specific electrodes to check
electrode_ids = ["Fp1", "Fp2", "O1", "O2"]
# Check impedance for those electrodes
impedance_results = await headset.electrode_manager.check_impedance(electrode_ids)
# Display results
print("\nInitial Impedance Results:")
for result in impedance_results:
status = "Good" if result.value < 10.0 else "High"
print(f"Electrode {result.id}: {result.value} kΩ - {status}")
# Give user time to adjust electrodes with high impedance
print("\nPlease adjust any electrodes with high impedance...")
time.sleep(10)
# Check again
impedance_results = await headset.electrode_manager.check_impedance(electrode_ids)
# Display updated results
print("\nUpdated Impedance Results:")
for result in impedance_results:
status = "Good" if result.value < 10.0 else "High"
print(f"Electrode {result.id}: {result.value} kΩ - {status}")
except Exception as error:
print(f"Error: {error}")
# Run the async function
asyncio.run(check_electrode_impedance())
Best Practices
Error Handling
Always implement proper error handling when working with the SDK:
try:
headsets = Headset.discover()
if len(headsets) == 0:
print("No headsets found.")
return
headset = headsets[0]
await headset.set_name("My Headset")
except Exception as error:
print(f"Error: {error}")
# Handle specific error types
if isinstance(error, ConnectionError):
print("Connection to headset was lost. Attempting to reconnect...")
# Implement reconnection logic
elif isinstance(error, ValueError):
print("Invalid configuration provided:", error)
# Fix configuration issues
Resource Management
Properly manage streams to conserve device resources:
# Using context manager (if supported)
async with stream:
# Stream is automatically started and stopped
await perform_operations()
# Or manual cleanup
try:
await stream.start()
# Use the stream...
finally:
# Ensure the stream is stopped even if an error occurs
if stream:
await stream.stop()
Connection Optimization
For optimal performance, implement the following practices:
-
Use debug mode only when necessary
# Enable only during development
headset = Headset("192.168.1.100", debug=True) -
Optimize stream configurations
# Use appropriate sample rates
{
"executable": "eeg_source",
"config": {
# Only use the sample rate you actually need
"sampleRate": 250, # Instead of 1000 if 250Hz is sufficient
# Only include channels you need to process
"channels": ["O1", "O2"],
}
} -
Implement connection monitoring
import asyncio
async def monitor_connection(headset):
while True:
try:
state = await headset.get_state()
if state.status != "connected":
print("Headset disconnected, attempting to reconnect...")
# Implement reconnection logic
except Exception as error:
print(f"Connection monitoring error: {error}")
# Check every 5 seconds
await asyncio.sleep(5)
# Start monitoring in the background
asyncio.create_task(monitor_connection(headset))
Troubleshooting
Common Issues
Cannot Discover Headsets
# Problem: Headset.discover() returns an empty list
# Solutions:
# 1. Ensure the headset is powered on and connected to the same network
# 2. Check firewall settings that might block UDP broadcasts (port 48010)
# 3. Try specifying the IP address directly:
headset = Headset("192.168.1.100")
If you know the headset's IP address, connecting directly is more reliable than discovery.
Stream Creation Fails
# Problem: stream.create() raises an exception
# Solutions:
# 1. Verify all UUIDs are valid
import uuid
if not is_valid_uuid(node_id): # Custom validation function
print(f"Invalid UUID: {node_id}")
# 2. Check that node executables exist on the headset
# Only use executables that are documented or that you've confirmed exist
# 3. Ensure pipe connections reference valid node IDs
# Make sure source and destination IDs match existing nodes
Configuration Storage Fails
# Problem: device_config_manager.create_config() raises an exception
# Solutions:
# 1. Check that the key is a valid string
# 2. Ensure the value can be properly serialized
# 3. Verify that the expiration format is correct
valid_expiration_formats = ["30s", "5m", "2h", "1d", "1w", "1M"]
Connection Timeouts
# Problem: API calls time out or take too long
# Solutions:
# 1. The headset may be overloaded; try simplifying your stream
# 2. Check network stability and latency
# 3. Increase timeout values in API calls
headset = Headset("192.168.1.100", timeout=10000) # 10 seconds instead of default
Diagnostic Techniques
Enable Debug Mode
# Turn on detailed logging
headset = Headset("192.168.1.100", debug=True)
Check Headset State
# Get comprehensive information about the headset
state = headset.get_state()
print(state)
# Check specific metrics
if state.cpu.load > 80:
print("CPU load is high, consider simplifying your stream")
if state.battery.percent < 20 and not state.battery.charging:
print("Battery is low, connect the headset to power")
Inspect Streams
# List all running streams to debug potential conflicts
streams = headset.streams_manager.list_streams()
print(f"Running streams: {len(streams)}")
for stream in streams:
print(f"- Stream {stream.id}: {stream.meta.get('name', 'unnamed')}")
Advanced Topics
Custom Networking Configuration
For environments with complex networking requirements:
headset = Headset(
"192.168.1.100",
port=48011, # Custom port
timeout=15000, # 15 second timeout
retries=3, # Number of retry attempts
retry_delay=1000, # Delay between retries (ms)
)
WebSocket Data Consumption
To work with data from a WebSocket output node:
import websockets
import asyncio
import json
async def connect_to_data_stream(url):
async with websockets.connect(url) as websocket:
print("Connected to data stream")
try:
while True:
data = await websocket.recv()
# Parse the incoming data
parsed_data = parse_eeg_data(data)
print("Received data:", parsed_data)
# Process or visualize the data
process_data(parsed_data)
except websockets.exceptions.ConnectionClosed:
print("Disconnected from data stream")
def parse_eeg_data(data):
# Example parsing function - adjust based on your data format
if isinstance(data, bytes):
# Handle binary data
return {"type": "binary", "size": len(data)}
else:
# Handle text data (likely JSON)
return json.loads(data)
def process_data(data):
# Process the EEG data (example)
print(f"Processing data point with timestamp: {data.get('timestamp')}")
# Connect to the WebSocket server running on the headset
asyncio.run(connect_to_data_stream("ws://192.168.1.100:36006"))
Multi-Headset Management
For applications that need to work with multiple headsets:
from instinct_py import Headset
import asyncio
class HeadsetManager:
def __init__(self):
self.headsets = {}
async def discover_and_connect(self):
devices = Headset.discover()
print(f"Found {len(devices)} headsets")
for device in devices:
try:
name = await device.get_name()
state = await device.get_state()
self.headsets[name] = {
"device": device,
"state": state,
"active_streams": []
}
print(f"Connected to headset: {name}")
except Exception as error:
print(f"Failed to connect to headset: {error}")
return len(self.headsets)
def get_headset(self, name):
return self.headsets.get(name, {}).get("device")
async def start_stream_on_all(self, stream_config):
results = []
for name, headset_info in self.headsets.items():
try:
stream = headset_info["device"].streams_manager.create_stream(stream_config)
await stream.create()
await stream.start()
headset_info["active_streams"].append(stream)
results.append({"name": name, "success": True, "stream": stream})
print(f"Started stream on headset: {name}")
except Exception as error:
results.append({"name": name, "success": False, "error": str(error)})
print(f"Failed to start stream on headset {name}: {error}")
return results
async def stop_all_streams(self):
for name, headset_info in self.headsets.items():
try:
for stream in headset_info["active_streams"]:
await stream.stop()
headset_info["active_streams"] = []
print(f"Stopped all streams on headset: {name}")
except Exception as error:
print(f"Error stopping streams on headset {name}: {error}")
# Example usage:
async def main():
manager = HeadsetManager()
count = await manager.discover_and_connect()
print(f"Connected to {count} headsets")
# Create a basic stream configuration
stream_config = {
"id": str(uuid.uuid4()),
# ... additional stream configuration
}
# Start the stream on all headsets
results = await manager.start_stream_on_all(stream_config)
print(f"Streams started on {sum(1 for r in results if r['success'])} of {len(results)} headsets")
# Let streams run for 30 seconds
await asyncio.sleep(30)
# Stop all streams
await manager.stop_all_streams()
asyncio.run(main())
This documentation provides a comprehensive overview of the Instinct Python SDK. For further assistance or contributions to this documentation, please contact the Nexstem development team.