Streaming makes AI features feel responsive instead of frozen. Here's the full pipeline — async views, SSE, JavaScript client, and the nginx tweaks that matter.
A non-streamed Claude response can take 5–60 seconds to fully generate. Showing the user a spinner that whole time = bad UX = abandoned features.
Streaming starts displaying the response as soon as the first tokens arrive (usually within ~500ms) and the rest fills in over time. The perceived latency drops from "frozen for 30 seconds" to "instant and live." Same total wait time, vastly better experience.
Browser (EventSource) ←──── SSE stream ──── nginx ←──── gunicorn ←──── Django async view ←──── Anthropic streaming API
Each link has a gotcha. We'll handle them all.
Django 5.2 supports StreamingHttpResponse out of the box. Combined with the Anthropic streaming API:
# myapp/views.py
import json
import logging
import anthropic
from django.conf import settings
from django.http import StreamingHttpResponse, HttpResponseBadRequest
from django.views.decorators.http import require_POST
from django.contrib.auth.decorators import login_required
logger = logging.getLogger(__name__)
client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
def sse_format(event_type: str, data: dict) -> str:
"""Build a single SSE event."""
return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
def claude_stream_generator(prompt: str):
"""Yield SSE events as Claude generates the response."""
try:
with client.messages.stream(
model=settings.ANTHROPIC_MODEL,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text_delta in stream.text_stream:
yield sse_format("delta", {"text": text_delta})
# Final message gives us totals
final = stream.get_final_message()
yield sse_format("done", {
"input_tokens": final.usage.input_tokens,
"output_tokens": final.usage.output_tokens,
})
except anthropic.APIError as e:
logger.exception("Streaming error")
yield sse_format("error", {"message": "AI service unavailable"})
except Exception:
logger.exception("Unexpected streaming error")
yield sse_format("error", {"message": "Something went wrong"})
@login_required
@require_POST
def stream_ask(request):
prompt = (request.POST.get("prompt") or "").strip()
if not prompt or len(prompt) > 5000:
return HttpResponseBadRequest("Invalid prompt")
response = StreamingHttpResponse(
claude_stream_generator(prompt),
content_type="text/event-stream",
)
response["Cache-Control"] = "no-cache"
response["X-Accel-Buffering"] = "no" # CRITICAL for nginx
response["Connection"] = "keep-alive"
return response
Key points:
text/event-stream content type — required for SSEX-Accel-Buffering: no — without this, nginx buffers your entire response and "streaming" doesn't workCache-Control: no-cache — keeps proxies from caching<form id="ask-form">
<textarea id="prompt" required></textarea>
<button type="submit">Ask</button>
</form>
<div id="output"></div>
<div id="meta"></div>
<script>
const form = document.getElementById('ask-form');
const output = document.getElementById('output');
const meta = document.getElementById('meta');
form.addEventListener('submit', async (e) => {
e.preventDefault();
output.textContent = '';
meta.textContent = 'Thinking...';
const prompt = document.getElementById('prompt').value;
const csrf = document.querySelector('[name=csrfmiddlewaretoken]').value;
const response = await fetch('/ask/stream/', {
method: 'POST',
body: new URLSearchParams({prompt, csrfmiddlewaretoken: csrf}),
});
if (!response.ok) {
meta.textContent = 'Error: ' + response.statusText;
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
buffer += decoder.decode(value, {stream: true});
// Parse complete SSE events
let i;
while ((i = buffer.indexOf('\n\n')) >= 0) {
const eventBlock = buffer.slice(0, i);
buffer = buffer.slice(i + 2);
const lines = eventBlock.split('\n');
const event = lines.find(l => l.startsWith('event: '))?.slice(7);
const data = lines.find(l => l.startsWith('data: '))?.slice(6);
if (!data) continue;
const payload = JSON.parse(data);
if (event === 'delta') {
output.textContent += payload.text;
} else if (event === 'done') {
meta.textContent =
`Done. ${payload.input_tokens} in / ${payload.output_tokens} out tokens.`;
} else if (event === 'error') {
meta.textContent = 'Error: ' + payload.message;
}
}
}
});
</script>
This uses the Fetch API streaming reader (more flexible than EventSource because it lets us POST, send CSRF, and handle auth properly).
If you only need GET-based streams without auth concerns, the standard EventSource is shorter:
const source = new EventSource('/ask/stream/?prompt=' + encodeURIComponent(prompt));
source.addEventListener('delta', e => output.textContent += JSON.parse(e.data).text);
source.addEventListener('done', e => source.close());
Without proper nginx config, your stream chunks at the gateway. Add to your site config:
location /ask/stream/ {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Streaming-specific
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 600s; # Long timeout for slow generations
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding on;
}
proxy_buffering off is the single most important line. Without it, nginx waits to receive a buffer-full of bytes from gunicorn before forwarding to the user, defeating streaming entirely.
Default gunicorn is fine for sync streaming. For high concurrency on streamed endpoints, use gevent workers:
gunicorn djzen.wsgi:application \
--workers 3 \
--worker-class gevent \
--worker-connections 1000 \
--timeout 600 \
--bind 127.0.0.1:8000
Without gevent, each in-flight stream blocks one worker. With it, one worker handles thousands of concurrent streams.
If you're using async views (Django 5+ supports them), use uvicorn workers instead:
gunicorn djzen.asgi:application \
--workers 3 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 127.0.0.1:8000
Chunks arrive in batches, not character-by-character. Almost always nginx buffering or gunicorn worker class. Check X-Accel-Buffering: no is set, proxy_buffering off in nginx, and you're using a worker that doesn't block per-connection.
Streams hang at 30 seconds. A timeout somewhere. Look at gunicorn --timeout, nginx proxy_read_timeout, and any CDN/Cloudflare timeouts in front (Cloudflare Free tier kills connections after 100 seconds).
Streams work locally but break in production. Almost certainly Cloudflare or another reverse proxy stripping or buffering SSE. Either disable proxying for the streaming endpoint or upgrade to a tier that supports SSE properly.
The browser's tab freezes during long streams. You're appending to the DOM too aggressively. Batch updates with requestAnimationFrame or use a virtual scroller for very long outputs.
CSRF errors on the streaming POST. Make sure you're sending the CSRF token (the example above does this). Or add the streaming view to CSRF_EXEMPT_PATHS if it's authenticated some other way.
Streams are awkward to test, but doable:
from django.test import Client
def test_stream():
c = Client()
c.login(username="test", password="test")
response = c.post("/ask/stream/", {"prompt": "Hi"})
assert response.status_code == 200
assert response["Content-Type"] == "text/event-stream"
chunks = list(response.streaming_content)
full = b"".join(chunks).decode()
assert "event: delta" in full
assert "event: done" in full
This proves the stream produces output. For end-to-end behaviour testing, Playwright works well.
Streaming AI responses takes a coordinated effort across:
When all five line up, you get a smooth, low-latency AI experience that feels native. When any one is wrong, you get the dreaded 30-second spinner. Worth the engineering.