Streaming
Stream responses token-by-token using Server-Sent Events (SSE) for faster time-to-first-token and a better user experience in interactive applications.
Prerequisites
- An API key (get one here)
- OpenAI SDK or HTTP client installed (Quickstart)
What you'll learn:
- How to enable streaming for chat completions
- How to process streaming chunks in different languages
- How to handle streaming with function calling
- Error handling patterns for streams
Why Streaming?
Without streaming, the API waits until the entire response is generated before sending it back. With streaming:
- Faster perceived response — The first token arrives in milliseconds instead of waiting seconds for the full response
- Better UX — Users see text appear progressively, like a human typing
- Lower memory usage — Process tokens as they arrive instead of buffering the full response
- Early termination — Stop generation mid-stream if the output isn't what you need
Basic Streaming
Enable streaming by setting stream: true in your request:
- curl
- Python
- Node.js
curl -X POST "$OPENAI_BASE_URL/chat/completions" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "Llama-3.3-70B-Instruct",
"messages": [{"role": "user", "content": "Explain quantum computing"}],
"stream": true
}'
The response is a stream of data: lines in SSE format:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"content":"Quantum"},"index":0}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"content":" computing"},"index":0}]}
...
data: [DONE]
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
print() # newline at end
import OpenAI from "openai";
const client = new OpenAI();
const stream = await client.chat.completions.create({
model: "Llama-3.3-70B-Instruct",
messages: [{ role: "user", content: "Explain quantum computing" }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
console.log(); // newline at end
Collecting the Full Response
If you need both streaming output and the complete text:
- Python
- Node.js
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "List 5 EU countries"}],
stream=True,
)
full_response = []
for chunk in stream:
if not chunk.choices:
continue
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
complete_text = "".join(full_response)
print(f"\n\nTotal length: {len(complete_text)} characters")
import OpenAI from "openai";
const client = new OpenAI();
const stream = await client.chat.completions.create({
model: "Llama-3.3-70B-Instruct",
messages: [{ role: "user", content: "List 5 EU countries" }],
stream: true,
});
const chunks = [];
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
chunks.push(content);
process.stdout.write(content);
}
}
const completeText = chunks.join("");
console.log(`\n\nTotal length: ${completeText.length} characters`);
Streaming with Function Calling
When streaming with tools/functions, tool call arguments arrive incrementally across chunks:
from openai import OpenAI
import json
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"},
},
"required": ["location"],
},
},
}
]
stream = client.chat.completions.create(
model="gpt-4.1",
messages=[{"role": "user", "content": "What's the weather in Berlin?"}],
tools=tools,
stream=True,
)
tool_calls = {}
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.id:
tool_calls[tc.index] = {
"id": tc.id,
"function": {"name": tc.function.name, "arguments": ""},
}
if tc.function.arguments:
tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# Process completed tool calls
for tc in tool_calls.values():
args = json.loads(tc["function"]["arguments"])
print(f"Tool: {tc['function']['name']}, Args: {args}")
Error Handling
Handle connection drops and errors gracefully during streaming:
from openai import OpenAI, APIError, APIConnectionError
client = OpenAI()
def stream_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=messages,
stream=True,
)
full_response = []
for chunk in stream:
if not chunk.choices:
continue
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
print()
return "".join(full_response)
except APIConnectionError:
print(f"\nConnection lost. Retry {attempt + 1}/{max_retries}...")
except APIError as e:
print(f"\nAPI error: {e}. Retry {attempt + 1}/{max_retries}...")
raise Exception("Max retries exceeded")
Stream Options
Request usage statistics in the final chunk with stream_options:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
stream_options={"include_usage": True},
)
for chunk in stream:
if chunk.usage:
print(f"\nTokens — prompt: {chunk.usage.prompt_tokens}, "
f"completion: {chunk.usage.completion_tokens}, "
f"total: {chunk.usage.total_tokens}")
elif chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Next Steps
- Chat Completions — Non-streaming chat API usage
- Function Calling — Define and use tools with the API
- Asynchronous Requests — Queue-based processing for batch workloads
- Error Codes — Handle API errors