Zum Inhalt springen

Streaming

Dieser Inhalt ist für v1.0.0. Geh zur neuesten Version, um die aktuellste Dokumentation zu bekommen.

Dieser Inhalt ist noch nicht in deiner Sprache verfügbar.

Stream responses token-by-token using Server-Sent Events (SSE) for faster time-to-first-token and a better user experience in interactive applications.

What you’ll learn:

  • How to enable streaming for chat completions
  • How to process streaming chunks in different languages
  • How to handle streaming with function calling
  • Error handling patterns for streams

Without streaming, the API waits until the entire response is generated before sending it back. With streaming:

  • Faster perceived response — The first token arrives in milliseconds instead of waiting seconds for the full response
  • Better UX — Users see text appear progressively, like a human typing
  • Lower memory usage — Process tokens as they arrive instead of buffering the full response
  • Early termination — Stop generation mid-stream if the output isn’t what you need

Enable streaming by setting stream: true in your request:

Terminal window
curl -X POST "$OPENAI_BASE_URL/chat/completions" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "Llama-3.3-70B-Instruct",
"messages": [{"role": "user", "content": "Explain quantum computing"}],
"stream": true
}'

The response is a stream of data: lines in SSE format:

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"},"index":0}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"content":"Quantum"},"index":0}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","choices":[{"delta":{"content":" computing"},"index":0}]}
...
data: [DONE]

If you need both streaming output and the complete text:

from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "List 5 EU countries"}],
stream=True,
)
full_response = []
for chunk in stream:
if not chunk.choices:
continue
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
complete_text = "".join(full_response)
print(f"\n\nTotal length: {len(complete_text)} characters")

When streaming with tools/functions, tool call arguments arrive incrementally across chunks:

from openai import OpenAI
import json
client = OpenAI()
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"},
},
"required": ["location"],
},
},
}
]
stream = client.chat.completions.create(
model="gpt-4.1",
messages=[{"role": "user", "content": "What's the weather in Berlin?"}],
tools=tools,
stream=True,
)
tool_calls = {}
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.id:
tool_calls[tc.index] = {
"id": tc.id,
"function": {"name": tc.function.name, "arguments": ""},
}
if tc.function.arguments:
tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
# Process completed tool calls
for tc in tool_calls.values():
args = json.loads(tc["function"]["arguments"])
print(f"Tool: {tc['function']['name']}, Args: {args}")

Handle connection drops and errors gracefully during streaming:

from openai import OpenAI, APIError, APIConnectionError
client = OpenAI()
def stream_with_retry(messages, max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=messages,
stream=True,
)
full_response = []
for chunk in stream:
if not chunk.choices:
continue
content = chunk.choices[0].delta.content
if content:
full_response.append(content)
print(content, end="", flush=True)
print()
return "".join(full_response)
except APIConnectionError:
print(f"\nConnection lost. Retry {attempt + 1}/{max_retries}...")
except APIError as e:
print(f"\nAPI error: {e}. Retry {attempt + 1}/{max_retries}...")
raise Exception("Max retries exceeded")

Request usage statistics in the final chunk with stream_options:

from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="Llama-3.3-70B-Instruct",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
stream_options={"include_usage": True},
)
for chunk in stream:
if chunk.usage:
print(f"\nTokens — prompt: {chunk.usage.prompt_tokens}, "
f"completion: {chunk.usage.completion_tokens}, "
f"total: {chunk.usage.total_tokens}")
elif chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)