package proxy import ( "bytes" "encoding/json" "fmt" "io" "net/http" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/mostlygeek/llama-swap/event" "github.com/tidwall/gjson" ) // TokenMetrics represents parsed token statistics from llama-server logs type TokenMetrics struct { ID int `json:"id"` Timestamp time.Time `json:"timestamp"` Model string `json:"model"` CachedTokens int `json:"cache_tokens"` InputTokens int `json:"input_tokens"` OutputTokens int `json:"output_tokens"` PromptPerSecond float64 `json:"prompt_per_second"` TokensPerSecond float64 `json:"tokens_per_second"` DurationMs int `json:"duration_ms"` } // TokenMetricsEvent represents a token metrics event type TokenMetricsEvent struct { Metrics TokenMetrics } func (e TokenMetricsEvent) Type() uint32 { return TokenMetricsEventID // defined in events.go } // metricsMonitor parses llama-server output for token statistics type metricsMonitor struct { mu sync.RWMutex metrics []TokenMetrics maxMetrics int nextID int logger *LogMonitor } func newMetricsMonitor(logger *LogMonitor, maxMetrics int) *metricsMonitor { mp := &metricsMonitor{ logger: logger, maxMetrics: maxMetrics, } return mp } // addMetrics adds a new metric to the collection and publishes an event func (mp *metricsMonitor) addMetrics(metric TokenMetrics) { mp.mu.Lock() defer mp.mu.Unlock() metric.ID = mp.nextID mp.nextID++ mp.metrics = append(mp.metrics, metric) if len(mp.metrics) > mp.maxMetrics { mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:] } event.Emit(TokenMetricsEvent{Metrics: metric}) } // getMetrics returns a copy of the current metrics func (mp *metricsMonitor) getMetrics() []TokenMetrics { mp.mu.RLock() defer mp.mu.RUnlock() result := make([]TokenMetrics, len(mp.metrics)) copy(result, mp.metrics) return result } // getMetricsJSON returns metrics as JSON func (mp *metricsMonitor) getMetricsJSON() ([]byte, error) { mp.mu.RLock() defer mp.mu.RUnlock() return json.Marshal(mp.metrics) } // wrapHandler wraps the proxy handler to extract token metrics // if wrapHandler returns an error it is safe to assume that no // data was sent to the client func (mp *metricsMonitor) wrapHandler( modelID string, writer gin.ResponseWriter, request *http.Request, next func(modelID string, w http.ResponseWriter, r *http.Request) error, ) error { recorder := newBodyCopier(writer) if err := next(modelID, recorder, request); err != nil { return err } // after this point we have to assume that data was sent to the client // and we can only log errors but not send them to clients if recorder.Status() != http.StatusOK { mp.logger.Warnf("metrics skipped, HTTP status=%d, path=%s", recorder.Status(), request.URL.Path) return nil } body := recorder.body.Bytes() if len(body) == 0 { mp.logger.Warn("metrics skipped, empty body") return nil } if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") { if tm, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil { mp.logger.Warnf("error processing streaming response: %v, path=%s", err, request.URL.Path) } else { mp.addMetrics(tm) } } else { if gjson.ValidBytes(body) { if tm, err := parseMetrics(modelID, recorder.StartTime(), gjson.ParseBytes(body)); err != nil { mp.logger.Warnf("error parsing metrics: %v, path=%s", err, request.URL.Path) } else { mp.addMetrics(tm) } } else { mp.logger.Warnf("metrics skipped, invalid JSON in response body path=%s", request.URL.Path) } } return nil } func processStreamingResponse(modelID string, start time.Time, body []byte) (TokenMetrics, error) { // Iterate **backwards** through the body looking for the data payload with // usage data. This avoids allocating a slice of all lines via bytes.Split. // Start from the end of the body and scan backwards for newlines pos := len(body) for pos > 0 { // Find the previous newline (or start of body) lineStart := bytes.LastIndexByte(body[:pos], '\n') if lineStart == -1 { lineStart = 0 } else { lineStart++ // Move past the newline } line := bytes.TrimSpace(body[lineStart:pos]) pos = lineStart - 1 // Move position before the newline for next iteration if len(line) == 0 { continue } // SSE payload always follows "data:" prefix := []byte("data:") if !bytes.HasPrefix(line, prefix) { continue } data := bytes.TrimSpace(line[len(prefix):]) if len(data) == 0 { continue } if bytes.Equal(data, []byte("[DONE]")) { // [DONE] line itself contains nothing of interest. continue } if gjson.ValidBytes(data) { return parseMetrics(modelID, start, gjson.ParseBytes(data)) } } return TokenMetrics{}, fmt.Errorf("no valid JSON data found in stream") } func parseMetrics(modelID string, start time.Time, jsonData gjson.Result) (TokenMetrics, error) { usage := jsonData.Get("usage") timings := jsonData.Get("timings") if !usage.Exists() && !timings.Exists() { return TokenMetrics{}, fmt.Errorf("no usage or timings data found") } // default values cachedTokens := -1 // unknown or missing data outputTokens := 0 inputTokens := 0 // timings data tokensPerSecond := -1.0 promptPerSecond := -1.0 durationMs := int(time.Since(start).Milliseconds()) if usage.Exists() { outputTokens = int(jsonData.Get("usage.completion_tokens").Int()) inputTokens = int(jsonData.Get("usage.prompt_tokens").Int()) } // use llama-server's timing data for tok/sec and duration as it is more accurate if timings.Exists() { inputTokens = int(jsonData.Get("timings.prompt_n").Int()) outputTokens = int(jsonData.Get("timings.predicted_n").Int()) promptPerSecond = jsonData.Get("timings.prompt_per_second").Float() tokensPerSecond = jsonData.Get("timings.predicted_per_second").Float() durationMs = int(jsonData.Get("timings.prompt_ms").Float() + jsonData.Get("timings.predicted_ms").Float()) if cachedValue := jsonData.Get("timings.cache_n"); cachedValue.Exists() { cachedTokens = int(cachedValue.Int()) } } return TokenMetrics{ Timestamp: time.Now(), Model: modelID, CachedTokens: cachedTokens, InputTokens: inputTokens, OutputTokens: outputTokens, PromptPerSecond: promptPerSecond, TokensPerSecond: tokensPerSecond, DurationMs: durationMs, }, nil } // responseBodyCopier records the response body and writes to the original response writer // while also capturing it in a buffer for later processing type responseBodyCopier struct { gin.ResponseWriter body *bytes.Buffer tee io.Writer start time.Time } func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier { bodyBuffer := &bytes.Buffer{} return &responseBodyCopier{ ResponseWriter: w, body: bodyBuffer, tee: io.MultiWriter(w, bodyBuffer), } } func (w *responseBodyCopier) Write(b []byte) (int, error) { if w.start.IsZero() { w.start = time.Now() } // Single write operation that writes to both the response and buffer return w.tee.Write(b) } func (w *responseBodyCopier) WriteHeader(statusCode int) { w.ResponseWriter.WriteHeader(statusCode) } func (w *responseBodyCopier) Header() http.Header { return w.ResponseWriter.Header() } func (w *responseBodyCopier) StartTime() time.Time { return w.start }