Fix nginx proxy buffering for streaming endpoints (#295)

* Fix nginx proxy buffering for streaming endpoints

- Add X-Accel-Buffering: no header to SSE endpoints (/api/events, /logs/stream)
- Add X-Accel-Buffering: no header to proxied text/event-stream responses
- Add nginx reverse proxy configuration section to README
- Add tests for X-Accel-Buffering header on streaming endpoints

Fixes #236

* Fix goroutine cleanup in streaming endpoints test

Add context cancellation to TestProxyManager_StreamingEndpointsReturnNoBufferingHeader
to ensure the goroutine is properly cleaned up when the test completes.
This commit is contained in:
Artur Podsiadły
2025-09-10 01:07:46 +02:00
committed by GitHub
parent b21dee27c1
commit 558801db1a
5 changed files with 97 additions and 0 deletions

View File

@@ -73,6 +73,30 @@ However, there are many more capabilities that llama-swap supports:
See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples. See the [configuration documentation](https://github.com/mostlygeek/llama-swap/wiki/Configuration) in the wiki all options and examples.
## Reverse Proxy Configuration (nginx)
If you deploy llama-swap behind nginx, disable response buffering for streaming endpoints. By default, nginx buffers responses which breaks ServerSent Events (SSE) and streaming chat completion. ([#236](https://github.com/mostlygeek/llama-swap/issues/236))
Recommended nginx configuration snippets:
```nginx
# SSE for UI events/logs
location /api/events {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
# Streaming chat completions (stream=true)
location /v1/chat/completions {
proxy_pass http://your-llama-swap-backend;
proxy_buffering off;
proxy_cache off;
}
```
As a safeguard, llama-swap also sets `X-Accel-Buffering: no` on SSE responses. However, explicitly disabling `proxy_buffering` at your reverse proxy is still recommended for reliable streaming behavior.
## Web UI ## Web UI
llama-swap includes a real time web interface for monitoring logs and models: llama-swap includes a real time web interface for monitoring logs and models:

View File

@@ -458,6 +458,10 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
w.Header().Add(k, v) w.Header().Add(k, v)
} }
} }
// prevent nginx from buffering streaming responses (e.g., SSE)
if strings.Contains(strings.ToLower(resp.Header.Get("Content-Type")), "text/event-stream") {
w.Header().Set("X-Accel-Buffering", "no")
}
w.WriteHeader(resp.StatusCode) w.WriteHeader(resp.StatusCode)
// faster than io.Copy when streaming // faster than io.Copy when streaming

View File

@@ -100,6 +100,8 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Cache-Control", "no-cache") c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive") c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff") c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering SSE
c.Header("X-Accel-Buffering", "no")
sendBuffer := make(chan messageEnvelope, 25) sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context()) ctx, cancel := context.WithCancel(c.Request.Context())

View File

@@ -28,6 +28,8 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
c.Header("Content-Type", "text/plain") c.Header("Content-Type", "text/plain")
c.Header("Transfer-Encoding", "chunked") c.Header("Transfer-Encoding", "chunked")
c.Header("X-Content-Type-Options", "nosniff") c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering streamed logs
c.Header("X-Accel-Buffering", "no")
logMonitorId := c.Param("logMonitorID") logMonitorId := c.Param("logMonitorID")
logger, err := pm.getLogger(logMonitorId) logger, err := pm.getLogger(logMonitorId)

View File

@@ -2,6 +2,7 @@ package proxy
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/rand" "math/rand"
@@ -913,3 +914,67 @@ models:
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState()) assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model1"].CurrentState())
assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState()) assert.Equal(t, StateReady, proxy.processGroups["preloadTestGroup"].processes["model2"].CurrentState())
} }
func TestProxyManager_StreamingEndpointsReturnNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"model1": getTestSimpleResponderConfig("model1"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
endpoints := []string{
"/api/events",
"/logs/stream",
"/logs/stream/proxy",
"/logs/stream/upstream",
}
for _, endpoint := range endpoints {
t.Run(endpoint, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := httptest.NewRequest("GET", endpoint, nil)
req = req.WithContext(ctx)
rec := httptest.NewRecorder()
// We don't need the handler to fully complete, just to set the headers
// so run it in a goroutine and check the headers after a short delay
go proxy.ServeHTTP(rec, req)
time.Sleep(10 * time.Millisecond) // give it time to start and write headers
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
})
}
}
func TestProxyManager_ProxiedStreamingEndpointReturnsNoBufferingHeader(t *testing.T) {
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
Models: map[string]ModelConfig{
"streaming-model": getTestSimpleResponderConfig("streaming-model"),
},
LogLevel: "error",
})
proxy := New(config)
defer proxy.StopProcesses(StopWaitForInflightRequest)
// Make a streaming request
reqBody := `{"model":"streaming-model"}`
// simple-responder will return text/event-stream when stream=true is in the query
req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody))
rec := httptest.NewRecorder()
proxy.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "no", rec.Header().Get("X-Accel-Buffering"))
assert.Contains(t, rec.Header().Get("Content-Type"), "text/event-stream")
}