From 0f133f5b74a7cb9ae78e1a4162899617cfd2d195 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Wed, 30 Oct 2024 21:02:30 -0700 Subject: [PATCH] Add /logs endpoint to monitor upstream processes - outputs last 10KB of logs from upstream processes - supports streaming --- README.md | 20 +++++++++++ proxy/logMonitor.go | 83 +++++++++++++++++++++++++++++++++++++++++++++ proxy/manager.go | 57 ++++++++++++++++++++++++++++--- 3 files changed, 155 insertions(+), 5 deletions(-) create mode 100644 proxy/logMonitor.go diff --git a/README.md b/README.md index 5c173a3..f6b705e 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,26 @@ models: * _Note: Windows currently untested._ 1. Run the binary with `llama-swap --config path/to/config.yaml` +## Monitoring Logs + +The `/logs` endpoint is available to monitor what llama-swap is doing. It will send the last 10KB of logs. Useful for monitoring the output of llama-server. It also supports streaming of logs. + +Usage: + +``` +# basic, sends up to the last 10KB of logs +curl http://host/logs' + +# add `stream` to stream new logs as they come in +curl -Ns 'http://host/logs?stream' + +# add `skip` to skip history (only useful if used with stream) +curl -Ns 'http://host/logs?stream&skip' + +# will output nothing :) +curl -Ns 'http://host/logs?skip' +``` + ## Systemd Unit Files Use this unit file to start llama-swap on boot. This is only tested on Ubuntu. diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go new file mode 100644 index 0000000..ff163a0 --- /dev/null +++ b/proxy/logMonitor.go @@ -0,0 +1,83 @@ +package proxy + +import ( + "container/ring" + "os" + "sync" +) + +type LogMonitor struct { + clients map[chan string]bool + mu sync.RWMutex + buffer *ring.Ring + bufferMu sync.RWMutex +} + +func NewLogMonitor() *LogMonitor { + return &LogMonitor{ + clients: make(map[chan string]bool), + buffer: ring.New(10 * 1024), // keep 10KB of buffered logs + } +} + +func (w *LogMonitor) Write(p []byte) (n int, err error) { + n, err = os.Stdout.Write(p) + if err != nil { + return n, err + } + + content := string(p) + + w.bufferMu.Lock() + w.buffer.Value = content + w.buffer = w.buffer.Next() + w.bufferMu.Unlock() + + w.Broadcast(content) + return n, nil +} + +func (w *LogMonitor) getHistory() string { + w.bufferMu.RLock() + defer w.bufferMu.RUnlock() + + var history string + w.buffer.Do(func(p interface{}) { + if p != nil { + if content, ok := p.(string); ok { + history += content + } + } + }) + return history +} + +func (w *LogMonitor) Subscribe() chan string { + w.mu.Lock() + defer w.mu.Unlock() + + ch := make(chan string, 100) + w.clients[ch] = true + return ch +} + +func (w *LogMonitor) Unsubscribe(ch chan string) { + w.mu.Lock() + defer w.mu.Unlock() + + delete(w.clients, ch) + close(ch) +} + +func (w *LogMonitor) Broadcast(msg string) { + w.mu.RLock() + defer w.mu.RUnlock() + + for client := range w.clients { + select { + case client <- msg: + default: + // If client buffer is full, skip + } + } +} diff --git a/proxy/manager.go b/proxy/manager.go index 5c67507..c3a5dbe 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -8,7 +8,6 @@ import ( "io" "net/http" "net/url" - "os" "os/exec" "strings" "sync" @@ -22,10 +21,11 @@ type ProxyManager struct { config *Config currentCmd *exec.Cmd currentConfig ModelConfig + logMonitor *LogMonitor } func New(config *Config) *ProxyManager { - return &ProxyManager{config: config} + return &ProxyManager{config: config, logMonitor: NewLogMonitor()} } func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) { @@ -36,12 +36,55 @@ func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) { pm.proxyChatRequest(w, r) } else if r.URL.Path == "/v1/models" { pm.listModels(w, r) + } else if r.URL.Path == "/logs" { + pm.streamLogs(w, r) } else { pm.proxyRequest(w, r) } } -func (pm *ProxyManager) listModels(w http.ResponseWriter, r *http.Request) { +func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("X-Content-Type-Options", "nosniff") + + ch := pm.logMonitor.Subscribe() + defer pm.logMonitor.Unsubscribe(ch) + + notify := r.Context().Done() + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + skipHistory := r.URL.Query().Has("skip") + if !skipHistory { + // Send history first + history := pm.logMonitor.getHistory() + if history != "" { + fmt.Fprint(w, history) + flusher.Flush() + } + } + + if !r.URL.Query().Has("stream") { + return + } + + // Stream new logs + for { + select { + case msg := <-ch: + fmt.Fprint(w, msg) + flusher.Flush() + case <-notify: + return + } + } +} + +func (pm *ProxyManager) listModels(w http.ResponseWriter, _ *http.Request) { data := []interface{}{} for id := range pm.config.Models { data = append(data, map[string]interface{}{ @@ -92,8 +135,12 @@ func (pm *ProxyManager) swapModel(requestedModel string) error { return fmt.Errorf("unable to get sanitized command: %v", err) } cmd := exec.Command(args[0], args[1:]...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr + + // logMonitor only writes to stdout + // so the upstream's stderr will go to os.Stdout + cmd.Stdout = pm.logMonitor + cmd.Stderr = pm.logMonitor + cmd.Env = modelConfig.Env err = cmd.Start()