Streaming Data
Traditional inference engines process your data when you query it. LayerScale processes your data when it arrives. By the time you ask a question, the model has already reasoned over your latest data. This is continuous inference: always on, always current, always ready.
Data flows into a session via HTTP or WebSocket, gets tokenized in the background, and is incorporated into the model’s live context without blocking your application. Your data never waits. Your queries never rebuild context from scratch.
How It Works
- Push data to a session via HTTP POST or WebSocket message
- Data queues in a lock-free ring buffer (single-producer, single-consumer)
- Background processor tokenizes the data and updates the model’s live context
- Flash Queries re-evaluate automatically after each data batch
- Results push to connected clients via SSE or WebSocket
The push endpoint returns immediately. Your application never blocks waiting for the model to process data.
Supported Data Types
Financial Market Data
OHLCV (Open, High, Low, Close, Volume) data for real-time market analysis, trend detection, and trading signals. Create the session with "type": "ohlcv".
{ "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}, {"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
o | float | Open price |
h | float | High price |
l | float | Low price |
c | float | Close price |
v | float | Volume |
Industrial IoT
Sensor telemetry for predictive maintenance, anomaly detection, and real-time quality control. Designed for manufacturing, energy, and infrastructure monitoring. Create the session with "type": "iot".
{ "data": [ {"sid": "temp-01", "val": 72.5, "lo": 60.0, "hi": 85.0}, {"sid": "temp-01", "val": 74.1, "lo": 60.0, "hi": 85.0} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
sid | string | Sensor identifier |
val | float | Current reading |
lo | float | Low threshold or observed minimum |
hi | float | High threshold or observed maximum |
Autonomous Systems
Spatial and kinematic data for robotics, drones, and autonomous vehicles. Supports real-time environmental reasoning and low-latency decision support. Create the session with "type": "spatial".
{ "data": [ {"x": 37.7749, "y": -122.4194, "z": 150.0, "spd": 12.5, "hdg": 270.0}, {"x": 37.7751, "y": -122.4190, "z": 151.2, "spd": 13.0, "hdg": 268.5} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
x | float | X position or latitude |
y | float | Y position or longitude |
z | float | Z position or altitude |
spd | float | Speed |
hdg | float | Heading in degrees (0-360) |
Security and Monitoring
Event stream data for SIEM, threat detection, and network monitoring. Enables continuous anomaly correlation and natural language alerting. Create the session with "type": "event".
{ "data": [ {"src": "10.0.1.5", "sev": 3, "cat": "auth_failure", "cnt": 12}, {"src": "10.0.1.8", "sev": 1, "cat": "port_scan", "cnt": 340} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
src | string | Source identifier (IP, hostname, service) |
sev | integer | Severity level (0 = info, 1 = low, 2 = medium, 3 = high, 4 = critical) |
cat | string | Event category |
cnt | integer | Event count in window |
Healthcare Monitoring
Patient vitals for ICU monitoring, wearable streams, and continuous clinical observation. Supports early warning detection and patient state reasoning. Create the session with "type": "vitals".
{ "data": [ {"hr": 72.0, "bp_s": 120.0, "bp_d": 80.0, "spo2": 98.5, "temp": 36.8}, {"hr": 74.0, "bp_s": 118.0, "bp_d": 79.0, "spo2": 98.2, "temp": 36.9} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
hr | float | Heart rate (bpm) |
bp_s | float | Systolic blood pressure (mmHg) |
bp_d | float | Diastolic blood pressure (mmHg) |
spo2 | float | Oxygen saturation (%) |
temp | float | Body temperature (Celsius) |
HTTP Streaming
Setup
Create a session, then push data to it:
# Create a sessioncurl -X POST http://localhost:8080/v1/sessions/init \ -H "Content-Type: application/json" \ -d '{ "type": "ohlcv", "prompt": "You are monitoring market data. Analyze trends as data arrives." }'Response:
{"session_id": "sess_a1b2c3d4e5f6", "type": "ohlcv", "n_tokens": 42, "window_size": 100}Push Data
curl -X POST http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/push \ -H "Content-Type: application/json" \ -d '{ "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}, {"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700} ] }'Response:
{"pushed": 2, "dropped": 0, "queue_size": 42, "queue_capacity": 4096}Continuous Ingestion (Python)
import requestsimport time
SESSION_ID = "sess_a1b2c3d4e5f6"BASE_URL = "http://localhost:8080"
def push_candle(candle): requests.post( f"{BASE_URL}/v1/sessions/{SESSION_ID}/stream/push", json={"data": [candle]}, )
# Push a candle every secondwhile True: candle = fetch_latest_candle() # your data source push_candle({ "o": candle["open"], "h": candle["high"], "l": candle["low"], "c": candle["close"], "v": candle["volume"], }) time.sleep(1)Check Stream Status
curl http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/status{ "type": "ohlcv", "streaming": { "initialized": true, "running": true, "error": false, "queue_size": 42, "queue_capacity": 4096, "window_size": 100 }, "statistics": { "total_enqueued": 1500, "total_dropped": 0, "batches_processed": 30, "items_processed": 1500 }, "data": { "initialized": true, "data_count": 85, "total_data_tokens": 3400 }}See API Reference for the full response schema.
WebSocket Streaming
WebSocket provides a persistent bidirectional connection that combines data push and event delivery. This is more efficient than HTTP when pushing data at high frequency.
Connect
ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/wsOn connect, the server sends:
{ "type": "connected", "data": { "session_id": "sess_a1b2c3d4e5f6", "streaming": true }}Push Data via WebSocket
{ "type": "push", "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500} ]}Receive Events
Flash Query results and errors arrive on the same connection:
{ "type": "flash_ready", "data": { "id": 1, "query": "Is the trend bullish?", "value": "Bullish", "confidence": 0.92, "data_version": 15 }}Python WebSocket Example
import asyncioimport jsonimport websockets
async def stream_data(session_id): uri = f"ws://localhost:8080/v1/sessions/{session_id}/ws"
async with websockets.connect(uri) as ws: # Wait for connection confirmation connected = json.loads(await ws.recv()) print(f"Connected: {connected}")
# Start a task to read events async def read_events(): async for message in ws: event = json.loads(message) if event["type"] == "flash_ready": data = event["data"] print(f"Flash Q{data['id']}: {data['value']} ({data['confidence']:.2f})")
reader = asyncio.create_task(read_events())
# Push data for candle in generate_candles(): await ws.send(json.dumps({ "type": "push", "data": [candle], })) await asyncio.sleep(0.5)
reader.cancel()
asyncio.run(stream_data("sess_a1b2c3d4e5f6"))TypeScript WebSocket Example
const ws = new WebSocket("ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/ws");
ws.onopen = () => { console.log("Connected");};
ws.onmessage = (event) => { const msg = JSON.parse(event.data);
if (msg.type === "connected") { console.log(`Session: ${msg.data.session_id}`); // Start pushing data after connection startPushing(); }
if (msg.type === "flash_ready") { console.log(`Flash Q${msg.data.id}: ${msg.data.value}`); }};
function startPushing() { setInterval(() => { ws.send(JSON.stringify({ type: "push", data: [{ o: 185.5, h: 186.2, l: 185.1, c: 185.8, v: 12500 }], })); }, 1000);}Keepalive
Send periodic ping messages to keep the connection alive:
{"type": "ping"}Processing Model
LayerScale maintains a live, continuously updated representation of your data inside the model’s attention state. As new data arrives, the engine incrementally updates this representation without reprocessing existing context. The model’s understanding of your data evolves in real time, not on demand.
The system prompt is compiled into a persistent prefix at session creation. This prefix is never recomputed, regardless of how much data flows through the session or how long it runs. All subsequent compute is spent exclusively on new information.
Context capacity is managed automatically through an adaptive scheduling system that balances data freshness against available compute budget. The engine determines the optimal amount of data to retain based on context size, data type, and query overhead, ensuring the model always has the most relevant data available while maintaining constant-time query performance.
Practical Examples
Market Data Feed
import requests
SESSION_ID = "sess_a1b2c3d4e5f6"
# Initialize with analysis prompt and flash queriesrequests.post("http://localhost:8080/v1/sessions/init", json={ "type": "ohlcv", "prompt": "You are a quantitative analyst monitoring real-time price data. " "Identify support/resistance levels, trend changes, and volume anomalies.", "flash": [ {"query": "Is momentum bullish or bearish?", "max_tokens": 4}, {"query": "Are there any volume anomalies?", "max_tokens": 32}, ],})
# Push candles from your data sourcefor candle in exchange_feed.stream("BTC/USD", interval="1m"): requests.post( f"http://localhost:8080/v1/sessions/{SESSION_ID}/stream/push", json={"data": [{ "o": candle.open, "h": candle.high, "l": candle.low, "c": candle.close, "v": candle.volume, }]}, )Sensor Data Monitoring
import requests
session = requests.post("http://localhost:8080/v1/sessions/init", json={ "type": "iot", "prompt": "You are monitoring temperature sensors in a data center. " "Alert if any readings suggest cooling failure.", "flash": [ {"query": "Are any sensors showing abnormal readings?", "max_tokens": 32}, {"query": "Is there a risk of thermal throttling?", "max_tokens": 8}, ],}).json()
# Push sensor readings using the native IoT formatfor batch in sensor_stream.read(): requests.post( f"http://localhost:8080/v1/sessions/{session['session_id']}/stream/push", json={"data": [{ "sid": batch.sensor_id, "val": batch.current, "lo": batch.threshold_low, "hi": batch.threshold_high, }]}, )Log Ingestion
import requests
session = requests.post("http://localhost:8080/v1/sessions/init", json={ "type": "event", "prompt": "You are analyzing application logs. Detect error patterns, " "latency spikes, and unusual behavior.", "flash": [ {"query": "Are there any error spikes?", "max_tokens": 16}, {"query": "Is latency degrading?", "max_tokens": 8}, ],}).json()
# Push log events using the native event formatfor window in log_aggregator.windows(interval="10s"): requests.post( f"http://localhost:8080/v1/sessions/{session['session_id']}/stream/push", json={"data": [{ "src": window.service_name, "sev": 3 if window.error_rate > 0.05 else 1, "cat": "latency_spike" if window.p99_ms > 500 else "normal", "cnt": window.total, }]}, )