Streaming Data
Traditional inference engines process your data when you query it. LayerScale processes your data when it arrives. By the time you ask a question, the model has already reasoned over your latest data. This is continuous inference: always on, always current, always ready.
Data flows into a session via HTTP or WebSocket, gets tokenized in the background, and is incorporated into the model’s live context without blocking your application. Your data never waits. Your queries never rebuild context from scratch.
How It Works
- Push data to a session via HTTP POST or WebSocket message
- Data queues in a lock-free ring buffer (single-producer, single-consumer)
- Background processor tokenizes the data and updates the model’s live context
- Flash Queries re-evaluate automatically after each data batch
- Results push to connected clients via SSE or WebSocket
The push endpoint returns immediately. Your application never blocks waiting for the model to process data.
Supported Data Types
Financial Market Data
OHLCV (Open, High, Low, Close, Volume) data for real-time market analysis, trend detection, and trading signals. Create the session with "type": "ohlcv".
{ "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}, {"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
o | float | Open price |
h | float | High price |
l | float | Low price |
c | float | Close price |
v | float | Volume |
Industrial IoT
Sensor telemetry for predictive maintenance, anomaly detection, and real-time quality control. Designed for manufacturing, energy, and infrastructure monitoring. Create the session with "type": "iot".
{ "data": [ {"sid": "temp-01", "val": 72.5, "lo": 60.0, "hi": 85.0}, {"sid": "temp-01", "val": 74.1, "lo": 60.0, "hi": 85.0} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
sid | string | Sensor identifier |
val | float | Current reading |
lo | float | Low threshold or observed minimum |
hi | float | High threshold or observed maximum |
Autonomous Systems
Spatial and kinematic data for robotics, drones, and autonomous vehicles. Supports real-time environmental reasoning and low-latency decision support. Create the session with "type": "spatial".
{ "data": [ {"x": 37.7749, "y": -122.4194, "z": 150.0, "spd": 12.5, "hdg": 270.0}, {"x": 37.7751, "y": -122.4190, "z": 151.2, "spd": 13.0, "hdg": 268.5} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
x | float | X position or latitude |
y | float | Y position or longitude |
z | float | Z position or altitude |
spd | float | Speed |
hdg | float | Heading in degrees (0-360) |
Security and Monitoring
Event stream data for SIEM, threat detection, and network monitoring. Enables continuous anomaly correlation and natural language alerting. Create the session with "type": "event".
{ "data": [ {"src": "10.0.1.5", "sev": 3, "cat": "auth_failure", "cnt": 12}, {"src": "10.0.1.8", "sev": 1, "cat": "port_scan", "cnt": 340} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
src | string | Source identifier (IP, hostname, service) |
sev | integer | Severity level (0 = info, 1 = low, 2 = medium, 3 = high, 4 = critical) |
cat | string | Event category |
cnt | integer | Event count in window |
Healthcare Monitoring
Patient vitals for ICU monitoring, wearable streams, and continuous clinical observation. Supports early warning detection and patient state reasoning. Create the session with "type": "vitals".
{ "data": [ {"hr": 72.0, "bp_s": 120.0, "bp_d": 80.0, "spo2": 98.5, "temp": 36.8}, {"hr": 74.0, "bp_s": 118.0, "bp_d": 79.0, "spo2": 98.2, "temp": 36.9} ]}| Field | Type | Description |
|---|---|---|
timestamp | integer | Unix timestamp (optional) |
hr | float | Heart rate (bpm) |
bp_s | float | Systolic blood pressure (mmHg) |
bp_d | float | Diastolic blood pressure (mmHg) |
spo2 | float | Oxygen saturation (%) |
temp | float | Body temperature (Celsius) |
HTTP Streaming
Setup
Create a session, then push data to it:
import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
const session = await client.sessions.create({ type: "ohlcv", prompt: "You are monitoring market data. Analyze trends as data arrives.",});from layerscale import LayerScale
client = LayerScale("http://localhost:8080", api_key="LS-...")
session = client.sessions.create( type="ohlcv", prompt="You are monitoring market data. Analyze trends as data arrives.",)curl -X POST http://localhost:8080/v1/sessions/init \ -H "Content-Type: application/json" \ -d '{ "type": "ohlcv", "prompt": "You are monitoring market data. Analyze trends as data arrives." }'Response:
{"session_id": "sess_a1b2c3d4e5f6", "type": "ohlcv", "n_tokens": 42, "window_size": 100}Push Data
await client.sessions.push(session.session_id, [ { o: 185.50, h: 186.20, l: 185.10, c: 185.80, v: 12500 }, { o: 185.80, h: 186.50, l: 185.60, c: 186.30, v: 8700 },]);client.sessions.push(session.session_id, [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}, {"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700},])curl -X POST http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/push \ -H "Content-Type: application/json" \ -d '{ "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}, {"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700} ] }'Response:
{ "pushed": 2, "dropped": 0, "queue_size": 42, "queue_capacity": 4096, "total_enqueued": 1500, "total_dropped": 0}Continuous Ingestion
import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
// Push a candle every secondsetInterval(async () => { const candle = await fetchLatestCandle(); // your data source await client.sessions.push(SESSION_ID, [{ o: candle.open, h: candle.high, l: candle.low, c: candle.close, v: candle.volume, }]);}, 1000);import timefrom layerscale import LayerScale
client = LayerScale("http://localhost:8080", api_key="LS-...")
# Push a candle every secondwhile True: candle = fetch_latest_candle() # your data source client.sessions.push(SESSION_ID, [{ "o": candle["open"], "h": candle["high"], "l": candle["low"], "c": candle["close"], "v": candle["volume"], }]) time.sleep(1)Check Stream Status
const status = await client.sessions.streamStatus(session.session_id);console.log(`Queue: ${status.streaming.queue_size}/${status.streaming.queue_capacity}`);console.log(`Processed: ${status.statistics.items_processed}`);status = client.sessions.stream_status(session.session_id)print(f"Queue: {status.streaming.queue_size}/{status.streaming.queue_capacity}")print(f"Processed: {status.statistics.items_processed}")curl http://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/stream/status{ "type": "ohlcv", "streaming": { "initialized": true, "running": true, "error": false, "queue_size": 42, "queue_capacity": 4096, "window_size": 100, "process_interval_us": 10000 }, "statistics": { "total_enqueued": 1500, "total_dropped": 0, "batches_processed": 30, "items_processed": 1500, "total_process_time_us": 450000, "avg_process_time_us": 15000 }, "data": { "initialized": true, "data_count": 85, "total_data_tokens": 3400, "pos_next": 3442 }}See API Reference for the full response schema.
WebSocket Streaming
WebSocket provides a persistent bidirectional connection that combines data push and event delivery. This is more efficient than HTTP when pushing data at high frequency.
Connect
ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/wsOn connect, the server sends a connected event with the current session snapshot, followed immediately by a flash_ready event for every registered query that already has a cached answer:
{ "type": "connected", "data": { "session_id": "sess_a1b2c3d4e5f6", "data_version": 15, "streaming": true, "flash_queries": 3 }}Push Data via WebSocket
{ "type": "push", "data": [ {"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500} ]}Receive Events
Flash Query results and errors arrive on the same connection:
{ "type": "flash_ready", "data": { "id": 1, "query": "Is the trend bullish?", "value": "Bullish", "data_version": 15, "confidence": 0.92, "evaluated_at": 1711000000000 }}Full WebSocket Example
import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
const socket = client.sessions.stream(SESSION_ID);
socket.on("connected", (data) => { console.log(`Connected: ${data.session_id}`);});
socket.on("flash_ready", (data) => { console.log(`Flash Q${data.id}: ${data.value} (${data.confidence.toFixed(2)})`);});
socket.on("open", () => { setInterval(() => { socket.push([{ o: 185.5, h: 186.2, l: 185.1, c: 185.8, v: 12500 }]); }, 500);});import asynciofrom layerscale import AsyncLayerScale, WsConnected, WsFlashReady
async def stream_data(session_id: str): async with AsyncLayerScale("http://localhost:8080", api_key="LS-...") as client: async with client.sessions.stream(session_id) as socket: async def push_loop(): for candle in generate_candles(): await socket.push([candle]) await asyncio.sleep(0.5)
asyncio.create_task(push_loop())
async for event in socket: if isinstance(event, WsConnected): print(f"Connected: {event.data.session_id}") elif isinstance(event, WsFlashReady): d = event.data print(f"Flash Q{d.id}: {d.value} ({d.confidence:.2f})")
asyncio.run(stream_data("sess_a1b2c3d4e5f6"))Keepalive
Send periodic ping messages to keep the connection alive:
{"type": "ping"}Processing Model
LayerScale maintains a live, continuously updated representation of your data inside the model’s attention state. As new data arrives, the engine incrementally updates this representation without reprocessing existing context. The model’s understanding of your data evolves in real time, not on demand.
The system prompt is compiled into a persistent prefix at session creation. This prefix is never recomputed, regardless of how much data flows through the session or how long it runs. All subsequent compute is spent exclusively on new information.
Context capacity is managed automatically through an adaptive scheduling system that balances data freshness against available compute budget. The engine determines the optimal amount of data to retain based on context size, data type, and query overhead, ensuring the model always has the most relevant data available while maintaining constant-time query performance.
Practical Examples
Market Data Feed
import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
// Initialize with analysis prompt and flash queriesconst session = await client.sessions.create({ type: "ohlcv", prompt: "You are a quantitative analyst monitoring real-time price data. " + "Identify support/resistance levels, trend changes, and volume anomalies.", flash: [ { query: "Is momentum bullish or bearish?", max_tokens: 4 }, { query: "Are there any volume anomalies?", max_tokens: 32 }, ], markPrefix: true,});
// Push candles from your data sourcefor await (const candle of exchangeFeed.stream("BTC/USD", "1m")) { await client.sessions.push(session.session_id, [{ o: candle.open, h: candle.high, l: candle.low, c: candle.close, v: candle.volume, }]);}from layerscale import LayerScale
client = LayerScale("http://localhost:8080", api_key="LS-...")
# Initialize with analysis prompt and flash queriessession = client.sessions.create( type="ohlcv", prompt=( "You are a quantitative analyst monitoring real-time price data. " "Identify support/resistance levels, trend changes, and volume anomalies." ), flash=[ {"query": "Is momentum bullish or bearish?", "max_tokens": 4}, {"query": "Are there any volume anomalies?", "max_tokens": 32}, ], mark_prefix=True,)
# Push candles from your data sourcefor candle in exchange_feed.stream("BTC/USD", interval="1m"): client.sessions.push(session.session_id, [{ "o": candle.open, "h": candle.high, "l": candle.low, "c": candle.close, "v": candle.volume, }])Sensor Data Monitoring
const session = await client.sessions.create({ type: "iot", prompt: "You are monitoring temperature sensors in a data center. " + "Alert if any readings suggest cooling failure.", flash: [ { query: "Are any sensors showing abnormal readings?", max_tokens: 32 }, { query: "Is there a risk of thermal throttling?", max_tokens: 8 }, ], markPrefix: true,});
for await (const batch of sensorStream.read()) { await client.sessions.push(session.session_id, [{ sid: batch.sensor_id, val: batch.current, lo: batch.threshold_low, hi: batch.threshold_high, }]);}session = client.sessions.create( type="iot", prompt=( "You are monitoring temperature sensors in a data center. " "Alert if any readings suggest cooling failure." ), flash=[ {"query": "Are any sensors showing abnormal readings?", "max_tokens": 32}, {"query": "Is there a risk of thermal throttling?", "max_tokens": 8}, ], mark_prefix=True,)
for batch in sensor_stream.read(): client.sessions.push(session.session_id, [{ "sid": batch.sensor_id, "val": batch.current, "lo": batch.threshold_low, "hi": batch.threshold_high, }])Log Ingestion
const session = await client.sessions.create({ type: "event", prompt: "You are analyzing application logs. Detect error patterns, " + "latency spikes, and unusual behavior.", flash: [ { query: "Are there any error spikes?", max_tokens: 16 }, { query: "Is latency degrading?", max_tokens: 8 }, ], markPrefix: true,});
for await (const window of logAggregator.windows("10s")) { await client.sessions.push(session.session_id, [{ src: window.service_name, sev: window.error_rate > 0.05 ? 3 : 1, cat: window.p99_ms > 500 ? "latency_spike" : "normal", cnt: window.total, }]);}session = client.sessions.create( type="event", prompt=( "You are analyzing application logs. Detect error patterns, " "latency spikes, and unusual behavior." ), flash=[ {"query": "Are there any error spikes?", "max_tokens": 16}, {"query": "Is latency degrading?", "max_tokens": 8}, ], mark_prefix=True,)
for window in log_aggregator.windows(interval="10s"): client.sessions.push(session.session_id, [{ "src": window.service_name, "sev": 3 if window.error_rate > 0.05 else 1, "cat": "latency_spike" if window.p99_ms > 500 else "normal", "cnt": window.total, }])