* proxy: refactor metrics recording - remove metrics_middleware.go as this wrapper is no longer needed. This also eliminiates double body parsing for the modelID - move metrics parsing to be part of MetricsMonitor - refactor how metrics are recording in ProxyManager - add MetricsMonitor tests - improve mem efficiency of processStreamingResponse - add benchmarks for MetricsMonitor.addMetrics - proxy: refactor MetricsMonitor to be more safe handling errors
268 lines
7.3 KiB
Go
268 lines
7.3 KiB
Go
package proxy
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/mostlygeek/llama-swap/event"
|
|
"github.com/tidwall/gjson"
|
|
)
|
|
|
|
// TokenMetrics represents parsed token statistics from llama-server logs
|
|
type TokenMetrics struct {
|
|
ID int `json:"id"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Model string `json:"model"`
|
|
CachedTokens int `json:"cache_tokens"`
|
|
InputTokens int `json:"input_tokens"`
|
|
OutputTokens int `json:"output_tokens"`
|
|
PromptPerSecond float64 `json:"prompt_per_second"`
|
|
TokensPerSecond float64 `json:"tokens_per_second"`
|
|
DurationMs int `json:"duration_ms"`
|
|
}
|
|
|
|
// TokenMetricsEvent represents a token metrics event
|
|
type TokenMetricsEvent struct {
|
|
Metrics TokenMetrics
|
|
}
|
|
|
|
func (e TokenMetricsEvent) Type() uint32 {
|
|
return TokenMetricsEventID // defined in events.go
|
|
}
|
|
|
|
// metricsMonitor parses llama-server output for token statistics
|
|
type metricsMonitor struct {
|
|
mu sync.RWMutex
|
|
metrics []TokenMetrics
|
|
maxMetrics int
|
|
nextID int
|
|
logger *LogMonitor
|
|
}
|
|
|
|
func newMetricsMonitor(logger *LogMonitor, maxMetrics int) *metricsMonitor {
|
|
mp := &metricsMonitor{
|
|
logger: logger,
|
|
maxMetrics: maxMetrics,
|
|
}
|
|
|
|
return mp
|
|
}
|
|
|
|
// addMetrics adds a new metric to the collection and publishes an event
|
|
func (mp *metricsMonitor) addMetrics(metric TokenMetrics) {
|
|
mp.mu.Lock()
|
|
defer mp.mu.Unlock()
|
|
|
|
metric.ID = mp.nextID
|
|
mp.nextID++
|
|
mp.metrics = append(mp.metrics, metric)
|
|
if len(mp.metrics) > mp.maxMetrics {
|
|
mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:]
|
|
}
|
|
event.Emit(TokenMetricsEvent{Metrics: metric})
|
|
}
|
|
|
|
// getMetrics returns a copy of the current metrics
|
|
func (mp *metricsMonitor) getMetrics() []TokenMetrics {
|
|
mp.mu.RLock()
|
|
defer mp.mu.RUnlock()
|
|
|
|
result := make([]TokenMetrics, len(mp.metrics))
|
|
copy(result, mp.metrics)
|
|
return result
|
|
}
|
|
|
|
// getMetricsJSON returns metrics as JSON
|
|
func (mp *metricsMonitor) getMetricsJSON() ([]byte, error) {
|
|
mp.mu.RLock()
|
|
defer mp.mu.RUnlock()
|
|
return json.Marshal(mp.metrics)
|
|
}
|
|
|
|
// wrapHandler wraps the proxy handler to extract token metrics
|
|
// if wrapHandler returns an error it is safe to assume that no
|
|
// data was sent to the client
|
|
func (mp *metricsMonitor) wrapHandler(
|
|
modelID string,
|
|
writer gin.ResponseWriter,
|
|
request *http.Request,
|
|
next func(modelID string, w http.ResponseWriter, r *http.Request) error,
|
|
) error {
|
|
recorder := newBodyCopier(writer)
|
|
if err := next(modelID, recorder, request); err != nil {
|
|
return err
|
|
}
|
|
|
|
// after this point we have to assume that data was sent to the client
|
|
// and we can only log errors but not send them to clients
|
|
|
|
if recorder.Status() != http.StatusOK {
|
|
mp.logger.Warnf("metrics skipped, HTTP status=%d, path=%s", recorder.Status(), request.URL.Path)
|
|
return nil
|
|
}
|
|
|
|
body := recorder.body.Bytes()
|
|
if len(body) == 0 {
|
|
mp.logger.Warn("metrics skipped, empty body")
|
|
return nil
|
|
}
|
|
|
|
if strings.Contains(recorder.Header().Get("Content-Type"), "text/event-stream") {
|
|
if tm, err := processStreamingResponse(modelID, recorder.StartTime(), body); err != nil {
|
|
mp.logger.Warnf("error processing streaming response: %v, path=%s", err, request.URL.Path)
|
|
} else {
|
|
mp.addMetrics(tm)
|
|
}
|
|
} else {
|
|
if gjson.ValidBytes(body) {
|
|
if tm, err := parseMetrics(modelID, recorder.StartTime(), gjson.ParseBytes(body)); err != nil {
|
|
mp.logger.Warnf("error parsing metrics: %v, path=%s", err, request.URL.Path)
|
|
} else {
|
|
mp.addMetrics(tm)
|
|
}
|
|
} else {
|
|
mp.logger.Warnf("metrics skipped, invalid JSON in response body path=%s", request.URL.Path)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func processStreamingResponse(modelID string, start time.Time, body []byte) (TokenMetrics, error) {
|
|
// Iterate **backwards** through the body looking for the data payload with
|
|
// usage data. This avoids allocating a slice of all lines via bytes.Split.
|
|
|
|
// Start from the end of the body and scan backwards for newlines
|
|
pos := len(body)
|
|
for pos > 0 {
|
|
// Find the previous newline (or start of body)
|
|
lineStart := bytes.LastIndexByte(body[:pos], '\n')
|
|
if lineStart == -1 {
|
|
lineStart = 0
|
|
} else {
|
|
lineStart++ // Move past the newline
|
|
}
|
|
|
|
line := bytes.TrimSpace(body[lineStart:pos])
|
|
pos = lineStart - 1 // Move position before the newline for next iteration
|
|
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
// SSE payload always follows "data:"
|
|
prefix := []byte("data:")
|
|
if !bytes.HasPrefix(line, prefix) {
|
|
continue
|
|
}
|
|
data := bytes.TrimSpace(line[len(prefix):])
|
|
|
|
if len(data) == 0 {
|
|
continue
|
|
}
|
|
|
|
if bytes.Equal(data, []byte("[DONE]")) {
|
|
// [DONE] line itself contains nothing of interest.
|
|
continue
|
|
}
|
|
|
|
if gjson.ValidBytes(data) {
|
|
return parseMetrics(modelID, start, gjson.ParseBytes(data))
|
|
}
|
|
}
|
|
|
|
return TokenMetrics{}, fmt.Errorf("no valid JSON data found in stream")
|
|
}
|
|
|
|
func parseMetrics(modelID string, start time.Time, jsonData gjson.Result) (TokenMetrics, error) {
|
|
usage := jsonData.Get("usage")
|
|
timings := jsonData.Get("timings")
|
|
if !usage.Exists() && !timings.Exists() {
|
|
return TokenMetrics{}, fmt.Errorf("no usage or timings data found")
|
|
}
|
|
// default values
|
|
cachedTokens := -1 // unknown or missing data
|
|
outputTokens := 0
|
|
inputTokens := 0
|
|
|
|
// timings data
|
|
tokensPerSecond := -1.0
|
|
promptPerSecond := -1.0
|
|
durationMs := int(time.Since(start).Milliseconds())
|
|
|
|
if usage.Exists() {
|
|
outputTokens = int(jsonData.Get("usage.completion_tokens").Int())
|
|
inputTokens = int(jsonData.Get("usage.prompt_tokens").Int())
|
|
}
|
|
|
|
// use llama-server's timing data for tok/sec and duration as it is more accurate
|
|
if timings.Exists() {
|
|
inputTokens = int(jsonData.Get("timings.prompt_n").Int())
|
|
outputTokens = int(jsonData.Get("timings.predicted_n").Int())
|
|
promptPerSecond = jsonData.Get("timings.prompt_per_second").Float()
|
|
tokensPerSecond = jsonData.Get("timings.predicted_per_second").Float()
|
|
durationMs = int(jsonData.Get("timings.prompt_ms").Float() + jsonData.Get("timings.predicted_ms").Float())
|
|
|
|
if cachedValue := jsonData.Get("timings.cache_n"); cachedValue.Exists() {
|
|
cachedTokens = int(cachedValue.Int())
|
|
}
|
|
}
|
|
|
|
return TokenMetrics{
|
|
Timestamp: time.Now(),
|
|
Model: modelID,
|
|
CachedTokens: cachedTokens,
|
|
InputTokens: inputTokens,
|
|
OutputTokens: outputTokens,
|
|
PromptPerSecond: promptPerSecond,
|
|
TokensPerSecond: tokensPerSecond,
|
|
DurationMs: durationMs,
|
|
}, nil
|
|
}
|
|
|
|
// responseBodyCopier records the response body and writes to the original response writer
|
|
// while also capturing it in a buffer for later processing
|
|
type responseBodyCopier struct {
|
|
gin.ResponseWriter
|
|
body *bytes.Buffer
|
|
tee io.Writer
|
|
start time.Time
|
|
}
|
|
|
|
func newBodyCopier(w gin.ResponseWriter) *responseBodyCopier {
|
|
bodyBuffer := &bytes.Buffer{}
|
|
return &responseBodyCopier{
|
|
ResponseWriter: w,
|
|
body: bodyBuffer,
|
|
tee: io.MultiWriter(w, bodyBuffer),
|
|
}
|
|
}
|
|
|
|
func (w *responseBodyCopier) Write(b []byte) (int, error) {
|
|
if w.start.IsZero() {
|
|
w.start = time.Now()
|
|
}
|
|
|
|
// Single write operation that writes to both the response and buffer
|
|
return w.tee.Write(b)
|
|
}
|
|
|
|
func (w *responseBodyCopier) WriteHeader(statusCode int) {
|
|
w.ResponseWriter.WriteHeader(statusCode)
|
|
}
|
|
|
|
func (w *responseBodyCopier) Header() http.Header {
|
|
return w.ResponseWriter.Header()
|
|
}
|
|
|
|
func (w *responseBodyCopier) StartTime() time.Time {
|
|
return w.start
|
|
}
|