Add /logs endpoint to monitor upstream processes

- outputs last 10KB of logs from upstream processes
- supports streaming
This commit is contained in:
Benson Wong
2024-10-30 21:02:30 -07:00
parent 1510b3fbd9
commit 0f133f5b74
3 changed files with 155 additions and 5 deletions

View File

@@ -59,6 +59,26 @@ models:
* _Note: Windows currently untested._
1. Run the binary with `llama-swap --config path/to/config.yaml`
## Monitoring Logs
The `/logs` endpoint is available to monitor what llama-swap is doing. It will send the last 10KB of logs. Useful for monitoring the output of llama-server. It also supports streaming of logs.
Usage:
```
# basic, sends up to the last 10KB of logs
curl http://host/logs'
# add `stream` to stream new logs as they come in
curl -Ns 'http://host/logs?stream'
# add `skip` to skip history (only useful if used with stream)
curl -Ns 'http://host/logs?stream&skip'
# will output nothing :)
curl -Ns 'http://host/logs?skip'
```
## Systemd Unit Files
Use this unit file to start llama-swap on boot. This is only tested on Ubuntu.

83
proxy/logMonitor.go Normal file
View File

@@ -0,0 +1,83 @@
package proxy
import (
"container/ring"
"os"
"sync"
)
type LogMonitor struct {
clients map[chan string]bool
mu sync.RWMutex
buffer *ring.Ring
bufferMu sync.RWMutex
}
func NewLogMonitor() *LogMonitor {
return &LogMonitor{
clients: make(map[chan string]bool),
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
}
}
func (w *LogMonitor) Write(p []byte) (n int, err error) {
n, err = os.Stdout.Write(p)
if err != nil {
return n, err
}
content := string(p)
w.bufferMu.Lock()
w.buffer.Value = content
w.buffer = w.buffer.Next()
w.bufferMu.Unlock()
w.Broadcast(content)
return n, nil
}
func (w *LogMonitor) getHistory() string {
w.bufferMu.RLock()
defer w.bufferMu.RUnlock()
var history string
w.buffer.Do(func(p interface{}) {
if p != nil {
if content, ok := p.(string); ok {
history += content
}
}
})
return history
}
func (w *LogMonitor) Subscribe() chan string {
w.mu.Lock()
defer w.mu.Unlock()
ch := make(chan string, 100)
w.clients[ch] = true
return ch
}
func (w *LogMonitor) Unsubscribe(ch chan string) {
w.mu.Lock()
defer w.mu.Unlock()
delete(w.clients, ch)
close(ch)
}
func (w *LogMonitor) Broadcast(msg string) {
w.mu.RLock()
defer w.mu.RUnlock()
for client := range w.clients {
select {
case client <- msg:
default:
// If client buffer is full, skip
}
}
}

View File

@@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"os"
"os/exec"
"strings"
"sync"
@@ -22,10 +21,11 @@ type ProxyManager struct {
config *Config
currentCmd *exec.Cmd
currentConfig ModelConfig
logMonitor *LogMonitor
}
func New(config *Config) *ProxyManager {
return &ProxyManager{config: config}
return &ProxyManager{config: config, logMonitor: NewLogMonitor()}
}
func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) {
@@ -36,12 +36,55 @@ func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) {
pm.proxyChatRequest(w, r)
} else if r.URL.Path == "/v1/models" {
pm.listModels(w, r)
} else if r.URL.Path == "/logs" {
pm.streamLogs(w, r)
} else {
pm.proxyRequest(w, r)
}
}
func (pm *ProxyManager) listModels(w http.ResponseWriter, r *http.Request) {
func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("X-Content-Type-Options", "nosniff")
ch := pm.logMonitor.Subscribe()
defer pm.logMonitor.Unsubscribe(ch)
notify := r.Context().Done()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
skipHistory := r.URL.Query().Has("skip")
if !skipHistory {
// Send history first
history := pm.logMonitor.getHistory()
if history != "" {
fmt.Fprint(w, history)
flusher.Flush()
}
}
if !r.URL.Query().Has("stream") {
return
}
// Stream new logs
for {
select {
case msg := <-ch:
fmt.Fprint(w, msg)
flusher.Flush()
case <-notify:
return
}
}
}
func (pm *ProxyManager) listModels(w http.ResponseWriter, _ *http.Request) {
data := []interface{}{}
for id := range pm.config.Models {
data = append(data, map[string]interface{}{
@@ -92,8 +135,12 @@ func (pm *ProxyManager) swapModel(requestedModel string) error {
return fmt.Errorf("unable to get sanitized command: %v", err)
}
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// logMonitor only writes to stdout
// so the upstream's stderr will go to os.Stdout
cmd.Stdout = pm.logMonitor
cmd.Stderr = pm.logMonitor
cmd.Env = modelConfig.Env
err = cmd.Start()