Skip to content

Streaming Data

Traditional inference engines process your data when you query it. LayerScale processes your data when it arrives. By the time you ask a question, the model has already reasoned over your latest data. This is continuous inference: always on, always current, always ready.

Data flows into a session via HTTP or WebSocket, gets tokenized in the background, and is incorporated into the model’s live context without blocking your application. Your data never waits. Your queries never rebuild context from scratch.

How It Works

  1. Push data to a session via HTTP POST or WebSocket message
  2. Data queues in a lock-free ring buffer (single-producer, single-consumer)
  3. Background processor tokenizes the data and updates the model’s live context
  4. Flash Queries re-evaluate automatically after each data batch
  5. Results push to connected clients via SSE or WebSocket

The push endpoint returns immediately. Your application never blocks waiting for the model to process data.

Supported Data Types

Financial Market Data

OHLCV (Open, High, Low, Close, Volume) data for real-time market analysis, trend detection, and trading signals. Create the session with "type": "ohlcv".

{
"data": [
{"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500},
{"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
ofloatOpen price
hfloatHigh price
lfloatLow price
cfloatClose price
vfloatVolume

Industrial IoT

Sensor telemetry for predictive maintenance, anomaly detection, and real-time quality control. Designed for manufacturing, energy, and infrastructure monitoring. Create the session with "type": "iot".

{
"data": [
{"sid": "temp-01", "val": 72.5, "lo": 60.0, "hi": 85.0},
{"sid": "temp-01", "val": 74.1, "lo": 60.0, "hi": 85.0}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
sidstringSensor identifier
valfloatCurrent reading
lofloatLow threshold or observed minimum
hifloatHigh threshold or observed maximum

Autonomous Systems

Spatial and kinematic data for robotics, drones, and autonomous vehicles. Supports real-time environmental reasoning and low-latency decision support. Create the session with "type": "spatial".

{
"data": [
{"x": 37.7749, "y": -122.4194, "z": 150.0, "spd": 12.5, "hdg": 270.0},
{"x": 37.7751, "y": -122.4190, "z": 151.2, "spd": 13.0, "hdg": 268.5}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
xfloatX position or latitude
yfloatY position or longitude
zfloatZ position or altitude
spdfloatSpeed
hdgfloatHeading in degrees (0-360)

Security and Monitoring

Event stream data for SIEM, threat detection, and network monitoring. Enables continuous anomaly correlation and natural language alerting. Create the session with "type": "event".

{
"data": [
{"src": "10.0.1.5", "sev": 3, "cat": "auth_failure", "cnt": 12},
{"src": "10.0.1.8", "sev": 1, "cat": "port_scan", "cnt": 340}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
srcstringSource identifier (IP, hostname, service)
sevintegerSeverity level (0 = info, 1 = low, 2 = medium, 3 = high, 4 = critical)
catstringEvent category
cntintegerEvent count in window

Healthcare Monitoring

Patient vitals for ICU monitoring, wearable streams, and continuous clinical observation. Supports early warning detection and patient state reasoning. Create the session with "type": "vitals".

{
"data": [
{"hr": 72.0, "bp_s": 120.0, "bp_d": 80.0, "spo2": 98.5, "temp": 36.8},
{"hr": 74.0, "bp_s": 118.0, "bp_d": 79.0, "spo2": 98.2, "temp": 36.9}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
hrfloatHeart rate (bpm)
bp_sfloatSystolic blood pressure (mmHg)
bp_dfloatDiastolic blood pressure (mmHg)
spo2floatOxygen saturation (%)
tempfloatBody temperature (Celsius)

HTTP Streaming

Setup

Create a session, then push data to it:

Terminal window
# Create a session
curl -X POST http://localhost:8080/v1/sessions/init \
-H "Content-Type: application/json" \
-d '{
"type": "ohlcv",
"prompt": "You are monitoring market data. Analyze trends as data arrives."
}'

Response:

{"session_id": "sess_a1b2c3d4e5f6", "type": "ohlcv", "n_tokens": 42, "window_size": 100}

Push Data

Terminal window
curl -X POST http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/push \
-H "Content-Type: application/json" \
-d '{
"data": [
{"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500},
{"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700}
]
}'

Response:

{"pushed": 2, "dropped": 0, "queue_size": 42, "queue_capacity": 4096}

Continuous Ingestion (Python)

import requests
import time
SESSION_ID = "sess_a1b2c3d4e5f6"
BASE_URL = "http://localhost:8080"
def push_candle(candle):
requests.post(
f"{BASE_URL}/v1/sessions/{SESSION_ID}/stream/push",
json={"data": [candle]},
)
# Push a candle every second
while True:
candle = fetch_latest_candle() # your data source
push_candle({
"o": candle["open"],
"h": candle["high"],
"l": candle["low"],
"c": candle["close"],
"v": candle["volume"],
})
time.sleep(1)

Check Stream Status

Terminal window
curl http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/status
{
"type": "ohlcv",
"streaming": {
"initialized": true,
"running": true,
"error": false,
"queue_size": 42,
"queue_capacity": 4096,
"window_size": 100
},
"statistics": {
"total_enqueued": 1500,
"total_dropped": 0,
"batches_processed": 30,
"items_processed": 1500
},
"data": {
"initialized": true,
"data_count": 85,
"total_data_tokens": 3400
}
}

See API Reference for the full response schema.

WebSocket Streaming

WebSocket provides a persistent bidirectional connection that combines data push and event delivery. This is more efficient than HTTP when pushing data at high frequency.

Connect

ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/ws

On connect, the server sends:

{
"type": "connected",
"data": {
"session_id": "sess_a1b2c3d4e5f6",
"streaming": true
}
}

Push Data via WebSocket

{
"type": "push",
"data": [
{"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}
]
}

Receive Events

Flash Query results and errors arrive on the same connection:

{
"type": "flash_ready",
"data": {
"id": 1,
"query": "Is the trend bullish?",
"value": "Bullish",
"confidence": 0.92,
"data_version": 15
}
}

Python WebSocket Example

import asyncio
import json
import websockets
async def stream_data(session_id):
uri = f"ws://localhost:8080/v1/sessions/{session_id}/ws"
async with websockets.connect(uri) as ws:
# Wait for connection confirmation
connected = json.loads(await ws.recv())
print(f"Connected: {connected}")
# Start a task to read events
async def read_events():
async for message in ws:
event = json.loads(message)
if event["type"] == "flash_ready":
data = event["data"]
print(f"Flash Q{data['id']}: {data['value']} ({data['confidence']:.2f})")
reader = asyncio.create_task(read_events())
# Push data
for candle in generate_candles():
await ws.send(json.dumps({
"type": "push",
"data": [candle],
}))
await asyncio.sleep(0.5)
reader.cancel()
asyncio.run(stream_data("sess_a1b2c3d4e5f6"))

TypeScript WebSocket Example

const ws = new WebSocket("ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/ws");
ws.onopen = () => {
console.log("Connected");
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.type === "connected") {
console.log(`Session: ${msg.data.session_id}`);
// Start pushing data after connection
startPushing();
}
if (msg.type === "flash_ready") {
console.log(`Flash Q${msg.data.id}: ${msg.data.value}`);
}
};
function startPushing() {
setInterval(() => {
ws.send(JSON.stringify({
type: "push",
data: [{ o: 185.5, h: 186.2, l: 185.1, c: 185.8, v: 12500 }],
}));
}, 1000);
}

Keepalive

Send periodic ping messages to keep the connection alive:

{"type": "ping"}

Processing Model

LayerScale maintains a live, continuously updated representation of your data inside the model’s attention state. As new data arrives, the engine incrementally updates this representation without reprocessing existing context. The model’s understanding of your data evolves in real time, not on demand.

The system prompt is compiled into a persistent prefix at session creation. This prefix is never recomputed, regardless of how much data flows through the session or how long it runs. All subsequent compute is spent exclusively on new information.

Context capacity is managed automatically through an adaptive scheduling system that balances data freshness against available compute budget. The engine determines the optimal amount of data to retain based on context size, data type, and query overhead, ensuring the model always has the most relevant data available while maintaining constant-time query performance.

Practical Examples

Market Data Feed

import requests
SESSION_ID = "sess_a1b2c3d4e5f6"
# Initialize with analysis prompt and flash queries
requests.post("http://localhost:8080/v1/sessions/init", json={
"type": "ohlcv",
"prompt": "You are a quantitative analyst monitoring real-time price data. "
"Identify support/resistance levels, trend changes, and volume anomalies.",
"flash": [
{"query": "Is momentum bullish or bearish?", "max_tokens": 4},
{"query": "Are there any volume anomalies?", "max_tokens": 32},
],
})
# Push candles from your data source
for candle in exchange_feed.stream("BTC/USD", interval="1m"):
requests.post(
f"http://localhost:8080/v1/sessions/{SESSION_ID}/stream/push",
json={"data": [{
"o": candle.open, "h": candle.high,
"l": candle.low, "c": candle.close,
"v": candle.volume,
}]},
)

Sensor Data Monitoring

import requests
session = requests.post("http://localhost:8080/v1/sessions/init", json={
"type": "iot",
"prompt": "You are monitoring temperature sensors in a data center. "
"Alert if any readings suggest cooling failure.",
"flash": [
{"query": "Are any sensors showing abnormal readings?", "max_tokens": 32},
{"query": "Is there a risk of thermal throttling?", "max_tokens": 8},
],
}).json()
# Push sensor readings using the native IoT format
for batch in sensor_stream.read():
requests.post(
f"http://localhost:8080/v1/sessions/{session['session_id']}/stream/push",
json={"data": [{
"sid": batch.sensor_id,
"val": batch.current,
"lo": batch.threshold_low,
"hi": batch.threshold_high,
}]},
)

Log Ingestion

import requests
session = requests.post("http://localhost:8080/v1/sessions/init", json={
"type": "event",
"prompt": "You are analyzing application logs. Detect error patterns, "
"latency spikes, and unusual behavior.",
"flash": [
{"query": "Are there any error spikes?", "max_tokens": 16},
{"query": "Is latency degrading?", "max_tokens": 8},
],
}).json()
# Push log events using the native event format
for window in log_aggregator.windows(interval="10s"):
requests.post(
f"http://localhost:8080/v1/sessions/{session['session_id']}/stream/push",
json={"data": [{
"src": window.service_name,
"sev": 3 if window.error_rate > 0.05 else 1,
"cat": "latency_spike" if window.p99_ms > 500 else "normal",
"cnt": window.total,
}]},
)