Streaming APIs: SSE and WebSocket Patterns 2026
Streaming APIs: How to Consume SSE and WebSocket Data
More APIs stream data instead of returning it all at once. AI APIs stream token-by-token. Financial APIs stream price updates. Chat APIs stream messages in real-time. Consuming streaming APIs requires different patterns than traditional request-response.
Three Types of Streaming
The core decision in any real-time architecture is which streaming primitive to use. SSE and WebSockets handle the majority of real-world use cases, but each has a distinct programming model and tradeoff profile. Streaming HTTP (chunked transfer encoding) is essentially what SSE is built on — the distinction matters mainly when you're implementing the server side.
Server-Sent Events are natively reconnecting, work through standard HTTP, and are easier to authenticate than WebSockets because they support standard HTTP headers. Any infrastructure that handles HTTP (load balancers, CDNs, proxies) handles SSE without special configuration, which is why SSE is the industry standard for AI token streaming — the stream just looks like a slow HTTP response to every intermediary.
WebSockets establish a persistent full-duplex TCP connection. The handshake begins as HTTP and upgrades to the WS protocol, which some older proxies and corporate firewalls do not support. If you're building for enterprise users behind restrictive networks, test WebSocket connectivity — SSE-based solutions are often more reliable in constrained network environments.
| Type | Direction | Protocol | Best For |
|---|---|---|---|
| Server-Sent Events (SSE) | Server → Client | HTTP | AI streaming, live feeds, notifications |
| WebSockets | Bidirectional | WS/WSS | Chat, gaming, collaborative editing |
| Streaming HTTP | Server → Client | HTTP (chunked) | File downloads, large responses |
Server-Sent Events (SSE)
How SSE Works
Client Server
│ │
│── GET /stream ──────────────→ │
│ Accept: text/event-stream │
│ │
│←── HTTP 200 ──────────────── │
│ Content-Type: text/event-stream
│ │
│←── data: {"token": "Hello"} │
│←── data: {"token": " world"} │
│←── data: {"token": "!"} │
│←── data: [DONE] │
│ │
Consuming SSE in the Browser
// Native EventSource API
const source = new EventSource('https://api.example.com/stream');
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
source.onerror = (error) => {
console.error('SSE error:', error);
// EventSource automatically reconnects
};
// Close when done
source.close();
Consuming SSE with fetch (More Control)
// fetch-based SSE — better for auth headers and error handling
async function consumeSSE(url: string, onEvent: (data: any) => void) {
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Accept': 'text/event-stream',
},
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE format: "data: {...}\n\n"
const lines = buffer.split('\n\n');
buffer = lines.pop() || ''; // Keep incomplete chunk
for (const chunk of lines) {
for (const line of chunk.split('\n')) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
onEvent(JSON.parse(data));
}
}
}
}
}
// Usage
await consumeSSE('https://api.openai.com/v1/chat/completions', (data) => {
process.stdout.write(data.choices[0]?.delta?.content || '');
});
AI API Streaming Pattern
// Stream AI responses token-by-token
async function streamAIResponse(
prompt: string,
onToken: (token: string) => void,
onComplete: (fullResponse: string) => void
): Promise<void> {
const response = await fetch('https://api.anthropic.com/v1/messages', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': API_KEY,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
stream: true,
messages: [{ role: 'user', content: prompt }],
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullResponse = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') {
onComplete(fullResponse);
return;
}
const event = JSON.parse(data);
if (event.type === 'content_block_delta') {
const token = event.delta.text;
fullResponse += token;
onToken(token);
}
}
}
onComplete(fullResponse);
}
// React hook for streaming
function useStreamingAI() {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const stream = async (prompt: string) => {
setIsStreaming(true);
setResponse('');
await streamAIResponse(
prompt,
(token) => setResponse(prev => prev + token),
() => setIsStreaming(false)
);
};
return { response, isStreaming, stream };
}
WebSockets
Basic WebSocket Client
class WebSocketClient {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private handlers: Map<string, ((data: any) => void)[]> = new Map();
connect(url: string) {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
const handlers = this.handlers.get(message.type) || [];
handlers.forEach(handler => handler(message.data));
};
this.ws.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.reconnectAttempts++;
this.connect(url);
}, delay);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
on(type: string, handler: (data: any) => void) {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type)!.push(handler);
}
send(type: string, data: any) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type, data }));
}
}
close() {
this.maxReconnectAttempts = 0; // Prevent reconnection
this.ws?.close();
}
}
// Usage
const ws = new WebSocketClient();
ws.connect('wss://api.example.com/ws');
ws.on('chat_message', (data) => {
console.log(`${data.sender}: ${data.text}`);
});
ws.on('price_update', (data) => {
console.log(`${data.symbol}: $${data.price}`);
});
ws.send('subscribe', { channel: 'BTC-USD' });
WebSocket with Authentication
// Method 1: Token in URL (common but less secure)
const ws = new WebSocket(`wss://api.example.com/ws?token=${TOKEN}`);
// Method 2: Auth message after connection
const ws = new WebSocket('wss://api.example.com/ws');
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: TOKEN,
}));
};
// Method 3: Subprotocol
const ws = new WebSocket('wss://api.example.com/ws', [TOKEN]);
Choosing SSE vs WebSocket
| Factor | SSE | WebSocket |
|---|---|---|
| Direction | Server → Client only | Bidirectional |
| Protocol | HTTP | WS (separate protocol) |
| Auto-reconnect | Built-in | Manual |
| Auth headers | Via fetch | Token in URL or auth message |
| Binary data | No (text only) | Yes |
| Proxy support | Good (standard HTTP) | Some proxies block |
| Browser support | All modern browsers | All modern browsers |
| Use case | AI streaming, notifications, feeds | Chat, gaming, collaboration |
Decision Guide
Do you need to send data FROM the client?
YES → WebSocket
NO ↓
Is it AI token streaming?
YES → SSE (standard for AI APIs)
NO ↓
Do you need binary data?
YES → WebSocket
NO → SSE (simpler, auto-reconnect, HTTP-native)
Error Handling for Streams
// Robust streaming with error handling
async function robustStream(
url: string,
onData: (data: any) => void,
options: {
maxRetries?: number;
timeout?: number;
onError?: (error: Error) => void;
onReconnect?: (attempt: number) => void;
} = {}
) {
const { maxRetries = 3, timeout = 30000 } = options;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
headers: { 'Authorization': `Bearer ${API_KEY}` },
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) return; // Stream completed successfully
const text = decoder.decode(value, { stream: true });
// Parse and deliver data...
onData(text);
}
} catch (error) {
if (attempt < maxRetries) {
options.onReconnect?.(attempt + 1);
await new Promise(r => setTimeout(r, Math.pow(2, attempt) * 1000));
continue;
}
options.onError?.(error as Error);
throw error;
}
}
}
Common Mistakes
| Mistake | Impact | Fix |
|---|---|---|
| No reconnection logic | Lost data on disconnect | Auto-reconnect with backoff |
| Not handling partial chunks | Corrupted data | Buffer and parse complete messages |
| Opening too many connections | Server resource exhaustion | Multiplex through single connection |
| No timeout on streams | Hanging connections | Set read timeout, abort stale streams |
| Ignoring backpressure | Memory overflow on slow consumers | Pause stream when buffer is full |
| Not closing connections | Resource leaks | Always close on unmount/cleanup |
Building SSE Endpoints in Node.js
Consuming streams is only half the picture. When you build your own API, you'll often need to proxy an upstream AI stream to your frontend, or generate your own event stream. Here's how to implement SSE correctly on the server side.
The critical requirements for an SSE endpoint are: set Content-Type: text/event-stream, set Cache-Control: no-cache and Connection: keep-alive, and flush data as it arrives rather than buffering it. In Express:
import express from 'express';
const app = express();
// SSE endpoint that proxies an upstream AI stream:
app.post('/api/chat', async (req, res) => {
const { messages } = req.body;
// Set SSE headers before writing anything:
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // Send headers immediately
// Handle client disconnect to abort the upstream request:
const controller = new AbortController();
req.on('close', () => controller.abort());
try {
const stream = await openai.chat.completions.create({
model: 'gpt-4o',
messages,
stream: true,
}, { signal: controller.signal });
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content;
if (token) {
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
}
res.write('data: [DONE]\n\n');
} catch (err: any) {
if (err.name !== 'AbortError') {
res.write(`data: ${JSON.stringify({ error: err.message })}\n\n`);
}
} finally {
res.end();
}
});
The req.on('close') handler is essential. Without it, when the user navigates away or closes the browser tab, your server continues consuming the upstream API and burning tokens — the HTTP connection is closed but Node.js keeps running the async iterator. Always wire up client disconnect to abort the upstream request.
For Hono (used in Cloudflare Workers and Bun), use streamSSE from hono/streaming:
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
const app = new Hono();
app.post('/api/chat', async (c) => {
const { messages } = await c.req.json();
return streamSSE(c, async (stream) => {
const aiStream = await openai.chat.completions.create({
model: 'gpt-4o', messages, stream: true,
});
for await (const chunk of aiStream) {
const token = chunk.choices[0]?.delta?.content;
if (token) await stream.writeSSE({ data: JSON.stringify({ token }) });
}
await stream.writeSSE({ data: '[DONE]' });
});
});
Backpressure: When Your Consumer Can't Keep Up
Backpressure is what happens when a stream producer generates data faster than the consumer can process it. For AI token streams or financial price feeds, the producer (OpenAI API, exchange WebSocket) delivers data at a fixed rate regardless of how fast your code processes it. If your processing is slow — database writes, external API calls, complex transformations — data accumulates in memory buffers.
In Node.js's WHATWG Streams API (used by fetch), backpressure is handled through the ReadableStream controller's desiredSize. When you call reader.read() in a loop, you naturally apply backpressure because the next chunk isn't requested until the await resolves. This is the correct pattern for sequential processing. Problems arise when you call multiple reads in parallel without waiting:
// WRONG — launches reads without waiting, no backpressure:
while (true) {
const promise = reader.read(); // Does not await here
processAsync(promise); // Parallel processing without coordination
}
// CORRECT — each read awaits the previous:
while (true) {
const { done, value } = await reader.read();
if (done) break;
await processChunk(value); // Process before reading next
}
For real-time price feeds and live data where you want to drop stale data rather than queue it, implement a "latest value only" pattern: store the latest received value in a variable and process it on a timer, rather than processing every event:
let latestPrice: number | null = null;
ws.on('message', (data) => {
const { price } = JSON.parse(data.toString());
latestPrice = price; // Always overwrite with latest
});
// Process latest value at a controlled rate:
setInterval(() => {
if (latestPrice !== null) {
updateUI(latestPrice);
latestPrice = null;
}
}, 16); // ~60fps, or 100ms for less frequent updates
For WebSocket streams where you cannot drop messages (chat, collaboration, order book updates), buffer incoming messages in a queue and process them with a rate-limited worker. Libraries like p-queue handle concurrency limits cleanly and prevent unbounded queue growth.
Scaling WebSockets Horizontally
WebSockets are stateful — the TCP connection persists between client and a specific server process. When you scale your backend to multiple instances behind a load balancer, a message from client A (connected to server 1) cannot natively reach client B (connected to server 2). You need a pub/sub layer to fan out messages across instances.
Redis pub/sub is the standard solution: each server instance subscribes to channels relevant to its connected clients. When server 1 receives a message that should go to client B, it publishes to a Redis channel. Server 2 (which has client B's connection) is subscribed to that channel and forwards the message:
import { createClient } from 'redis';
import { WebSocketServer, WebSocket } from 'ws';
const redisPub = createClient({ url: process.env.REDIS_URL });
const redisSub = createClient({ url: process.env.REDIS_URL });
await Promise.all([redisPub.connect(), redisSub.connect()]);
const wss = new WebSocketServer({ port: 8080 });
const clients = new Map<string, WebSocket>(); // userId → ws
// Subscribe to messages for this server's connected clients:
await redisSub.subscribe('messages', (payload) => {
const { toUserId, message } = JSON.parse(payload);
const ws = clients.get(toUserId);
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
});
// When server receives a message, publish to Redis for all instances:
wss.on('connection', (ws, req) => {
const userId = getUserId(req);
clients.set(userId, ws);
ws.on('message', async (data) => {
const { toUserId, text } = JSON.parse(data.toString());
await redisPub.publish('messages', JSON.stringify({ toUserId, message: { from: userId, text } }));
});
ws.on('close', () => clients.delete(userId));
});
Sticky sessions are an alternative: configure your load balancer to route all requests from the same client to the same server instance (based on IP or cookie). This avoids the Redis pub/sub complexity for read-heavy workloads where most messages are self-contained. The downside is reduced load distribution — a power user with many connections pins a disproportionate load to one server. For most applications, Redis pub/sub is the more scalable architecture once you're running more than 2-3 server instances.
WebSocket heartbeats prevent silent connection drops. Many load balancers, corporate firewalls, and mobile carriers close idle connections after 30-90 seconds of inactivity. Implement a ping/pong mechanism: the server sends a ping every 30 seconds; if the client doesn't respond within 10 seconds, close and let it reconnect:
const HEARTBEAT_INTERVAL = 30_000;
const HEARTBEAT_TIMEOUT = 10_000;
wss.on('connection', (ws) => {
let isAlive = true;
ws.on('pong', () => { isAlive = true; });
const interval = setInterval(() => {
if (!isAlive) return ws.terminate();
isAlive = false;
ws.ping();
}, HEARTBEAT_INTERVAL);
ws.on('close', () => clearInterval(interval));
});
Methodology
Choosing a streaming library vs. rolling your own depends on scale and requirements. For AI token streaming in a Next.js application, the Vercel AI SDK's useChat hook handles SSE consumption, reconnection, abort on navigation, and React state updates — use it instead of manually implementing the fetch-based SSE consumer shown here. For custom streaming protocols or non-AI use cases, the manual patterns shown here give you full control. For production WebSocket servers handling thousands of concurrent connections, Socket.IO or uWebSockets.js (which uses native C++ bindings for 10x better connection density than Node.js's built-in ws) are worth evaluating before building a custom implementation.
Monitoring streaming endpoints in production requires different metrics than request-response endpoints. Track p50/p95 time-to-first-token (for AI streams), stream abandonment rate (clients who disconnect before the stream completes), and stream error rate. Standard APM tools like Datadog and New Relic support SSE and WebSocket instrumentation, but you need to explicitly configure stream duration tracking rather than relying on default HTTP request duration metrics, which measure the full stream duration rather than time-to-first-byte.
SSE specification follows the WHATWG Server-Sent Events standard; browser EventSource behavior described in this article applies to all modern browsers (Chrome 6+, Firefox 6+, Safari 5+; no native support in IE/Edge Legacy, which require polyfills). WebSocket framing and ping/pong behavior described per RFC 6455. Node.js streaming code examples tested with Node.js 22.x LTS using the WHATWG Streams API. Redis pub/sub for WebSocket scaling uses redis npm package v4.x; ioredis is an alternative with similar API. Hono SSE examples based on Hono v4.x streamSSE utility. Backpressure behavior of ReadableStream described per the WHATWG Streams specification; actual behavior depends on runtime implementation (V8, Bun, Deno handle backpressure differently). All AI streaming examples use openai npm package v4.x with the stream: true option on chat completions. WebSocket heartbeat intervals shown (30 seconds ping, 10 seconds pong timeout) are conservative defaults; adjust based on your load balancer's idle connection timeout, which is typically between 60 and 300 seconds depending on the provider — AWS ALB defaults to 60 seconds, Nginx defaults to 75 seconds. Set your heartbeat interval to roughly half the load balancer timeout to ensure connections are kept alive reliably across all infrastructure layers.
Find APIs with streaming support on APIScout — SSE, WebSocket, and real-time capabilities compared across providers.
Related: Building Real-Time APIs, SSE vs WebSockets vs Long Polling: Real-Time APIs 2026, How AI Is Transforming API Design and Documentation