diff --git a/misc/simple-responder/simple-responder.go b/misc/simple-responder/simple-responder.go index 228c946..2a23987 100644 --- a/misc/simple-responder/simple-responder.go +++ b/misc/simple-responder/simple-responder.go @@ -78,6 +78,14 @@ func main() { "prompt_tokens": 25, "total_tokens": 35, }, + // add timings to simulate llama.cpp + "timings": gin.H{ + "prompt_n": 25, + "prompt_ms": 13, + "predicted_n": 10, + "predicted_ms": 17, + "predicted_per_second": 10, + }, } c.SSEvent("message", finalData) c.Writer.Flush() @@ -102,6 +110,13 @@ func main() { "prompt_tokens": 25, "total_tokens": 35, }, + "timings": gin.H{ + "prompt_n": 25, + "prompt_ms": 13, + "predicted_n": 10, + "predicted_ms": 17, + "predicted_per_second": 10, + }, }) } }) diff --git a/proxy/metrics_middleware.go b/proxy/metrics_middleware.go index 7626f36..1718a94 100644 --- a/proxy/metrics_middleware.go +++ b/proxy/metrics_middleware.go @@ -67,51 +67,66 @@ func (rec *MetricsRecorder) processBody(body []byte) { } } -func (rec *MetricsRecorder) parseAndRecordMetrics(jsonData gjson.Result) { - if !jsonData.Get("usage").Exists() { - return +func (rec *MetricsRecorder) parseAndRecordMetrics(jsonData gjson.Result) bool { + usage := jsonData.Get("usage") + if !usage.Exists() { + return false } + // default values outputTokens := int(jsonData.Get("usage.completion_tokens").Int()) inputTokens := int(jsonData.Get("usage.prompt_tokens").Int()) + tokensPerSecond := -1.0 + durationMs := int(time.Since(rec.startTime).Milliseconds()) - if outputTokens > 0 { - duration := time.Since(rec.startTime) - tokensPerSecond := float64(inputTokens+outputTokens) / duration.Seconds() - - metrics := TokenMetrics{ - Timestamp: time.Now(), - Model: rec.realModelName, - InputTokens: inputTokens, - OutputTokens: outputTokens, - TokensPerSecond: tokensPerSecond, - DurationMs: int(duration.Milliseconds()), - } - rec.metricsMonitor.addMetrics(metrics) + // use llama-server's timing data for tok/sec and duration as it is more accurate + if timings := jsonData.Get("timings"); timings.Exists() { + tokensPerSecond = jsonData.Get("timings.predicted_per_second").Float() + durationMs = int(jsonData.Get("timings.prompt_ms").Float() + jsonData.Get("timings.predicted_ms").Float()) } + + rec.metricsMonitor.addMetrics(TokenMetrics{ + Timestamp: time.Now(), + Model: rec.realModelName, + InputTokens: inputTokens, + OutputTokens: outputTokens, + TokensPerSecond: tokensPerSecond, + DurationMs: durationMs, + }) + + return true } func (rec *MetricsRecorder) processStreamingResponse(body []byte) { + // Iterate **backwards** through the lines looking for the data payload with + // usage data lines := bytes.Split(body, []byte("\n")) - for _, line := range lines { - line = bytes.TrimSpace(line) + + for i := len(lines) - 1; i >= 0; i-- { + line := bytes.TrimSpace(lines[i]) if len(line) == 0 { continue } - // Check for SSE data prefix - if data, found := bytes.CutPrefix(line, []byte("data:")); found { - data = bytes.TrimSpace(data) - if len(data) == 0 { - continue - } - if bytes.Equal(data, []byte("[DONE]")) { - break - } + // SSE payload always follows "data:" + prefix := []byte("data:") + if !bytes.HasPrefix(line, prefix) { + continue + } + data := bytes.TrimSpace(line[len(prefix):]) - // Parse JSON to look for usage data - if gjson.ValidBytes(data) { - rec.parseAndRecordMetrics(gjson.ParseBytes(data)) + if len(data) == 0 { + continue + } + + if bytes.Equal(data, []byte("[DONE]")) { + // [DONE] line itself contains nothing of interest. + continue + } + + if gjson.ValidBytes(data) { + if rec.parseAndRecordMetrics(gjson.ParseBytes(data)) { + return // short circuit if a metric was recorded } } } diff --git a/proxy/proxymanager_test.go b/proxy/proxymanager_test.go index 363551d..81c489b 100644 --- a/proxy/proxymanager_test.go +++ b/proxy/proxymanager_test.go @@ -708,7 +708,9 @@ func TestProxyManager_MiddlewareWritesMetrics_NonStreaming(t *testing.T) { // Check that metrics were recorded metrics := proxy.metricsMonitor.GetMetrics() - assert.NotEmpty(t, metrics, "metrics should be recorded for non-streaming request") + if !assert.NotEmpty(t, metrics, "metrics should be recorded for non-streaming request") { + return + } // Verify the last metric has the correct model lastMetric := metrics[len(metrics)-1] @@ -741,7 +743,9 @@ func TestProxyManager_MiddlewareWritesMetrics_Streaming(t *testing.T) { // Check that metrics were recorded metrics := proxy.metricsMonitor.GetMetrics() - assert.NotEmpty(t, metrics, "metrics should be recorded for streaming request") + if !assert.NotEmpty(t, metrics, "metrics should be recorded for streaming request") { + return + } // Verify the last metric has the correct model lastMetric := metrics[len(metrics)-1] diff --git a/ui/src/pages/Activity.tsx b/ui/src/pages/Activity.tsx index 890bbe9..70cae42 100644 --- a/ui/src/pages/Activity.tsx +++ b/ui/src/pages/Activity.tsx @@ -1,6 +1,18 @@ import { useState, useEffect } from "react"; import { useAPI } from "../contexts/APIProvider"; +const formatTimestamp = (timestamp: string): string => { + return new Date(timestamp).toLocaleString(); +}; + +const formatSpeed = (speed: number): string => { + return speed < 0 ? "unknown" : speed.toFixed(2) + " t/s"; +}; + +const formatDuration = (ms: number): string => { + return (ms / 1000).toFixed(2) + "s"; +}; + const ActivityPage = () => { const { metrics } = useAPI(); const [error, setError] = useState(null); @@ -11,18 +23,6 @@ const ActivityPage = () => { } }, [metrics]); - const formatTimestamp = (timestamp: string) => { - return new Date(timestamp).toLocaleString(); - }; - - const formatSpeed = (speed: number) => { - return speed.toFixed(2) + " t/s"; - }; - - const formatDuration = (ms: number) => { - return (ms / 1000).toFixed(2) + "s"; - }; - if (error) { return (
@@ -51,7 +51,7 @@ const ActivityPage = () => { Model Input Tokens Output Tokens - Processing Speed + Generation Speed Duration