From 558801db1ad1c4c5dfec393db9f01302c8216c8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Artur=20Podsiad=C5=82y?= <84812539+artpods56@users.noreply.github.com> Date: Wed, 10 Sep 2025 01:07:46 +0200 Subject: [PATCH] Fix nginx proxy buffering for streaming endpoints (#295) * Fix nginx proxy buffering for streaming endpoints - Add X-Accel-Buffering: no header to SSE endpoints (/api/events, /logs/stream) - Add X-Accel-Buffering: no header to proxied text/event-stream responses - Add nginx reverse proxy configuration section to README - Add tests for X-Accel-Buffering header on streaming endpoints Fixes #236 * Fix goroutine cleanup in streaming endpoints test Add context cancellation to TestProxyManager_StreamingEndpointsReturnNoBufferingHeader to ensure the goroutine is properly cleaned up when the test completes. --- README.md | 24 ++++++++++++ proxy/process.go | 4 ++ proxy/proxymanager_api.go | 2 + proxy/proxymanager_loghandlers.go | 2 + proxy/proxymanager_test.go | 65 +++++++++++++++++++++++++++++++ 5 files changed, 97 insertions(+) diff --git a/README.md b/README.md index 77afe24..8ea52ca 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,30 @@ However, there are many more capabilities that llama-swap supports: See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples. +## Reverse Proxy Configuration (nginx) + +If you deploy llama-swap behind nginx, disable response buffering for streaming endpoints. By default, nginx buffers responses which breaks Server‑Sent Events (SSE) and streaming chat completion. ([#236](https://github.com/mostlygeek/llama-swap/issues/236)) + +Recommended nginx configuration snippets: + +```nginx +# SSE for UI events/logs +location /api/events { + proxy_pass http://your-llama-swap-backend; + proxy_buffering off; + proxy_cache off; +} + +# Streaming chat completions (stream=true) +location /v1/chat/completions { + proxy_pass http://your-llama-swap-backend; + proxy_buffering off; + proxy_cache off; +} +``` + +As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. However, explicitly disabling `proxy_buffering` at your reverse proxy is still recommended for reliable streaming behavior. + ## Web UI llama-swap includes a real time web interface for monitoring logs and models: diff --git a/proxy/process.go b/proxy/process.go index d4b1c81..94c004b 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -458,6 +458,10 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) { w.Header().Add(k, v) } } + // prevent nginx from buffering streaming responses (e.g., SSE) + if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") { + w.Header().Set("X-Accel-Buffering", "no") + } w.WriteHeader(resp.StatusCode) // faster than io.Copy when streaming diff --git a/proxy/proxymanager_api.go b/proxy/proxymanager_api.go index f133e4c..19460ea 100644 --- a/proxy/proxymanager_api.go +++ b/proxy/proxymanager_api.go @@ -100,6 +100,8 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("X-Content-Type-Options", "nosniff") + // prevent nginx from buffering SSE + c.Header("X-Accel-Buffering", "no") sendBuffer := make(chan messageEnvelope, 25) ctx, cancel := context.WithCancel(c.Request.Context()) diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go index d466672..a3de806 100644 --- a/proxy/proxymanager_loghandlers.go +++ b/proxy/proxymanager_loghandlers.go @@ -28,6 +28,8 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { c.Header("Content-Type", "text/plain") c.Header("Transfer-Encoding", "chunked") c.Header("X-Content-Type-Options", "nosniff") + // prevent nginx from buffering streamed logs + c.Header("X-Accel-Buffering", "no") logMonitorId := c.Param("logMonitorID") logger, err := pm.getLogger(logMonitorId) diff --git a/proxy/proxymanager_test.go b/proxy/proxymanager_test.go index 7de9b66..e5cea5e 100644 --- a/proxy/proxymanager_test.go +++ b/proxy/proxymanager_test.go @@ -2,6 +2,7 @@ package proxy import ( "bytes" + "context" "encoding/json" "fmt" "math/rand" @@ -913,3 +914,67 @@ models: assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState()) assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState()) } + +func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) { + config := AddDefaultGroupToConfig(Config{ + HealthCheckTimeout: 15, + Models: map[string]ModelConfig{ + "model1": getTestSimpleResponderConfig("model1"), + }, + LogLevel: "error", + }) + + proxy := New(config) + defer proxy.StopProcesses(StopWaitForInflightRequest) + + endpoints := []string{ + "/api/events", + "/logs/stream", + "/logs/stream/proxy", + "/logs/stream/upstream", + } + + for _, endpoint := range endpoints { + t.Run(endpoint, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + req := httptest.NewRequest("GET", endpoint, nil) + req = req.WithContext(ctx) + rec := httptest.NewRecorder() + + // We don't need the handler to fully complete, just to set the headers + // so run it in a goroutine and check the headers after a short delay + go proxy.ServeHTTP(rec, req) + time.Sleep(10 * time.Millisecond) // give it time to start and write headers + + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering")) + }) + } +} + +func TestProxyManager_ProxiedStreamingEndpointReturnsNoBufferingHeader(t *testing.T) { + config := AddDefaultGroupToConfig(Config{ + HealthCheckTimeout: 15, + Models: map[string]ModelConfig{ + "streaming-model": getTestSimpleResponderConfig("streaming-model"), + }, + LogLevel: "error", + }) + + proxy := New(config) + defer proxy.StopProcesses(StopWaitForInflightRequest) + + // Make a streaming request + reqBody := `{"model":"streaming-model"}` + // simple-responder will return text/event-stream when stream=true is in the query + req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody)) + rec := httptest.NewRecorder() + + proxy.ServeHTTP(rec, req) + + assert.Equal(t, http.StatusOK, rec.Code) + assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering")) + assert.Contains(t, rec.Header().Get("Content-Type"), "text/event-stream") +}