Skip to content

Streaming Data

Traditional inference engines process your data when you query it. LayerScale processes your data when it arrives. By the time you ask a question, the model has already reasoned over your latest data. This is continuous inference: always on, always current, always ready.

Data flows into a session via HTTP or WebSocket, gets tokenized in the background, and is incorporated into the model’s live context without blocking your application. Your data never waits. Your queries never rebuild context from scratch.

How It Works

  1. Push data to a session via HTTP POST or WebSocket message
  2. Data queues in a lock-free ring buffer (single-producer, single-consumer)
  3. Background processor tokenizes the data and updates the model’s live context
  4. Flash Queries re-evaluate automatically after each data batch
  5. Results push to connected clients via SSE or WebSocket

The push endpoint returns immediately. Your application never blocks waiting for the model to process data.

Supported Data Types

Financial Market Data

OHLCV (Open, High, Low, Close, Volume) data for real-time market analysis, trend detection, and trading signals. Create the session with "type": "ohlcv".

{
"data": [
{"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500},
{"o": 185.80, "h": 186.50, "l": 185.60, "c": 186.30, "v": 8700}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
ofloatOpen price
hfloatHigh price
lfloatLow price
cfloatClose price
vfloatVolume

Industrial IoT

Sensor telemetry for predictive maintenance, anomaly detection, and real-time quality control. Designed for manufacturing, energy, and infrastructure monitoring. Create the session with "type": "iot".

{
"data": [
{"sid": "temp-01", "val": 72.5, "lo": 60.0, "hi": 85.0},
{"sid": "temp-01", "val": 74.1, "lo": 60.0, "hi": 85.0}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
sidstringSensor identifier
valfloatCurrent reading
lofloatLow threshold or observed minimum
hifloatHigh threshold or observed maximum

Autonomous Systems

Spatial and kinematic data for robotics, drones, and autonomous vehicles. Supports real-time environmental reasoning and low-latency decision support. Create the session with "type": "spatial".

{
"data": [
{"x": 37.7749, "y": -122.4194, "z": 150.0, "spd": 12.5, "hdg": 270.0},
{"x": 37.7751, "y": -122.4190, "z": 151.2, "spd": 13.0, "hdg": 268.5}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
xfloatX position or latitude
yfloatY position or longitude
zfloatZ position or altitude
spdfloatSpeed
hdgfloatHeading in degrees (0-360)

Security and Monitoring

Event stream data for SIEM, threat detection, and network monitoring. Enables continuous anomaly correlation and natural language alerting. Create the session with "type": "event".

{
"data": [
{"src": "10.0.1.5", "sev": 3, "cat": "auth_failure", "cnt": 12},
{"src": "10.0.1.8", "sev": 1, "cat": "port_scan", "cnt": 340}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
srcstringSource identifier (IP, hostname, service)
sevintegerSeverity level (0 = info, 1 = low, 2 = medium, 3 = high, 4 = critical)
catstringEvent category
cntintegerEvent count in window

Healthcare Monitoring

Patient vitals for ICU monitoring, wearable streams, and continuous clinical observation. Supports early warning detection and patient state reasoning. Create the session with "type": "vitals".

{
"data": [
{"hr": 72.0, "bp_s": 120.0, "bp_d": 80.0, "spo2": 98.5, "temp": 36.8},
{"hr": 74.0, "bp_s": 118.0, "bp_d": 79.0, "spo2": 98.2, "temp": 36.9}
]
}
FieldTypeDescription
timestampintegerUnix timestamp (optional)
hrfloatHeart rate (bpm)
bp_sfloatSystolic blood pressure (mmHg)
bp_dfloatDiastolic blood pressure (mmHg)
spo2floatOxygen saturation (%)
tempfloatBody temperature (Celsius)

HTTP Streaming

Setup

Create a session, then push data to it:

import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
const session = await client.sessions.create({
type: "ohlcv",
prompt: "You are monitoring market data. Analyze trends as data arrives.",
});

Push Data

await client.sessions.push(session.session_id, [
{ o: 185.50, h: 186.20, l: 185.10, c: 185.80, v: 12500 },
{ o: 185.80, h: 186.50, l: 185.60, c: 186.30, v: 8700 },
]);

Continuous Ingestion

import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
// Push a candle every second
setInterval(async () => {
const candle = await fetchLatestCandle(); // your data source
await client.sessions.push(SESSION_ID, [{
o: candle.open,
h: candle.high,
l: candle.low,
c: candle.close,
v: candle.volume,
}]);
}, 1000);

Check Stream Status

const status = await client.sessions.streamStatus(session.session_id);
console.log(`Queue: ${status.streaming.queue_size}/${status.streaming.queue_capacity}`);
console.log(`Processed: ${status.statistics.items_processed}`);

See API Reference for the full response schema.

WebSocket Streaming

WebSocket provides a persistent bidirectional connection that combines data push and event delivery. This is more efficient than HTTP when pushing data at high frequency.

Connect

ws://localhost:8080/v1/sessions/sess_a1b2c3d4e5f6/ws

On connect, the server sends a connected event with the current session snapshot, followed immediately by a flash_ready event for every registered query that already has a cached answer:

{
"type": "connected",
"data": {
"session_id": "sess_a1b2c3d4e5f6",
"data_version": 15,
"streaming": true,
"flash_queries": 3
}
}

Push Data via WebSocket

{
"type": "push",
"data": [
{"o": 185.50, "h": 186.20, "l": 185.10, "c": 185.80, "v": 12500}
]
}

Receive Events

Flash Query results and errors arrive on the same connection:

{
"type": "flash_ready",
"data": {
"id": 1,
"query": "Is the trend bullish?",
"value": "Bullish",
"data_version": 15,
"confidence": 0.92,
"evaluated_at": 1711000000000
}
}

Full WebSocket Example

import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
const socket = client.sessions.stream(SESSION_ID);
socket.on("connected", (data) => {
console.log(`Connected: ${data.session_id}`);
});
socket.on("flash_ready", (data) => {
console.log(`Flash Q${data.id}: ${data.value} (${data.confidence.toFixed(2)})`);
});
socket.on("open", () => {
setInterval(() => {
socket.push([{ o: 185.5, h: 186.2, l: 185.1, c: 185.8, v: 12500 }]);
}, 500);
});

Keepalive

Send periodic ping messages to keep the connection alive:

{"type": "ping"}

Processing Model

LayerScale maintains a live, continuously updated representation of your data inside the model’s attention state. As new data arrives, the engine incrementally updates this representation without reprocessing existing context. The model’s understanding of your data evolves in real time, not on demand.

The system prompt is compiled into a persistent prefix at session creation. This prefix is never recomputed, regardless of how much data flows through the session or how long it runs. All subsequent compute is spent exclusively on new information.

Context capacity is managed automatically through an adaptive scheduling system that balances data freshness against available compute budget. The engine determines the optimal amount of data to retain based on context size, data type, and query overhead, ensuring the model always has the most relevant data available while maintaining constant-time query performance.

Practical Examples

Market Data Feed

import { LayerScale } from "@layerscale/layerscale";
const client = new LayerScale("http://localhost:8080", { apiKey: "LS-..." });
// Initialize with analysis prompt and flash queries
const session = await client.sessions.create({
type: "ohlcv",
prompt:
"You are a quantitative analyst monitoring real-time price data. " +
"Identify support/resistance levels, trend changes, and volume anomalies.",
flash: [
{ query: "Is momentum bullish or bearish?", max_tokens: 4 },
{ query: "Are there any volume anomalies?", max_tokens: 32 },
],
markPrefix: true,
});
// Push candles from your data source
for await (const candle of exchangeFeed.stream("BTC/USD", "1m")) {
await client.sessions.push(session.session_id, [{
o: candle.open,
h: candle.high,
l: candle.low,
c: candle.close,
v: candle.volume,
}]);
}

Sensor Data Monitoring

const session = await client.sessions.create({
type: "iot",
prompt:
"You are monitoring temperature sensors in a data center. " +
"Alert if any readings suggest cooling failure.",
flash: [
{ query: "Are any sensors showing abnormal readings?", max_tokens: 32 },
{ query: "Is there a risk of thermal throttling?", max_tokens: 8 },
],
markPrefix: true,
});
for await (const batch of sensorStream.read()) {
await client.sessions.push(session.session_id, [{
sid: batch.sensor_id,
val: batch.current,
lo: batch.threshold_low,
hi: batch.threshold_high,
}]);
}

Log Ingestion

const session = await client.sessions.create({
type: "event",
prompt:
"You are analyzing application logs. Detect error patterns, " +
"latency spikes, and unusual behavior.",
flash: [
{ query: "Are there any error spikes?", max_tokens: 16 },
{ query: "Is latency degrading?", max_tokens: 8 },
],
markPrefix: true,
});
for await (const window of logAggregator.windows("10s")) {
await client.sessions.push(session.session_id, [{
src: window.service_name,
sev: window.error_rate > 0.05 ? 3 : 1,
cat: window.p99_ms > 500 ? "latency_spike" : "normal",
cnt: window.total,
}]);
}