Raven supports streaming responses using Server-Sent Events (SSE), matching the OpenAI streaming format. Streaming delivers tokens to your application as they are generated, reducing perceived latency.
Enabling Streaming
Set stream: true in your request body:
import OpenAI from "openai";
const client = new OpenAI({
apiKey: "rk_live_...",
baseURL: "http://localhost:4000/v1"
});
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "Write a poem about coding" }],
stream: true
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}
Streaming responses use the Server-Sent Events format. Each chunk is prefixed with data: and separated by double newlines:
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}
data: {"id":"chatcmpl-abc123","object":"chat.completion.chunk","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
The stream terminates with data: [DONE].
How Streaming Works in Raven
When a streaming request arrives:
Authentication and Gate Checks
Raven authenticates the virtual key, checks rate limits, evaluates guardrails, and verifies budgets — the same as a non-streaming request.
Provider Forwarding
The request is forwarded to the upstream provider with streaming enabled.
Stream Passthrough
As the provider generates tokens, Raven pipes each SSE chunk through a TransformStream to your application in real-time.
Token Accumulation
In parallel, a StreamTokenAccumulator processes each chunk to track input tokens, output tokens, reasoning tokens, and cached tokens.
Post-Stream Logging
When the stream completes (the TransformStream flush fires), Raven logs the full request with accurate token counts, cost calculation, and latency.
Guardrails and policies are evaluated before the stream begins. If a guardrail blocks the request, you receive a synchronous error response instead of a stream.
Provider-Specific Normalization
Different providers use different streaming formats. Raven normalizes all of them to the OpenAI SSE format:
| Provider | Native Format | Raven Output |
|---|
| OpenAI | OpenAI SSE | Passed through unchanged |
| Anthropic | Anthropic SSE (content_block_delta) | Normalized to OpenAI format |
This normalization is handled by each provider’s normalizeStreamChunk adapter method. Your application always receives the same OpenAI-compatible format regardless of which provider serves the request.
Token Tracking with Streaming
Even with streaming, Raven accurately tracks all token usage:
| Metric | How It Is Tracked |
|---|
| Input tokens | Extracted from the final stream chunk’s usage field |
| Output tokens | Accumulated from content deltas or the final usage field |
| Reasoning tokens | Extracted from provider-specific fields |
| Cached tokens | Reported by providers that support prompt caching |
| Cost | Calculated from final token counts using the provider’s pricing |
| Latency | Measured from request start to stream completion |
Consuming Streams
Text Deltas Only
The simplest approach — iterate over text content:
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "Hello" }],
stream: true
});
let fullText = "";
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content || "";
fullText += delta;
process.stdout.write(delta);
}
console.log("\n\nFull response:", fullText);
Full Chunk Access
For more control, access the complete chunk object:
for await (const chunk of stream) {
const choice = chunk.choices[0];
if (choice?.delta?.content) {
process.stdout.write(choice.delta.content);
}
if (choice?.delta?.tool_calls) {
for (const tc of choice.delta.tool_calls) {
console.log("Tool call:", tc.function?.name, tc.function?.arguments);
}
}
if (choice?.finish_reason) {
console.log("\nFinish reason:", choice.finish_reason);
}
if (chunk.usage) {
console.log("Usage:", chunk.usage);
}
}
Streaming with the Raven SDK
The @raven/sdk provides a streamText method with built-in token accumulation:
import { RavenClient } from "@raven/sdk";
const raven = new RavenClient({
apiKey: "rk_live_...",
baseUrl: "http://localhost:4000"
});
const stream = await raven.streamText({
model: "gpt-4o",
provider: "openai",
messages: [{ role: "user", content: "Hello" }]
});
// Iterate over text deltas
for await (const text of stream) {
process.stdout.write(text);
}
// After iteration, get accumulated results
const fullText = await stream.text;
const usage = await stream.usage;
const finishReason = await stream.finishReason;
See TypeScript SDK for more details.
Error Handling
If an error occurs during streaming:
- Before the stream starts — You receive a standard JSON error response
- During the stream — The stream is terminated and the connection is closed
try {
const stream = await client.chat.completions.create({
model: "gpt-4o",
messages: [{ role: "user", content: "Hello" }],
stream: true
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}
} catch (error) {
console.error("Stream error:", error.message);
}