From 04fc67354a748935e56b3660749fd9f3405eff3b Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Fri, 15 Aug 2025 21:44:08 -0700 Subject: [PATCH] Improve Activity event handling in the UI (#254) Improve Activity event handling in the UI - fixes #252 found that the Activity page showed activity inconsistent with /api/metrics - Change data structure for event metrics to array. - Add Event stream connections status indicator --- misc/benchmark-chatcompletion/main.go | 159 +++++++++++++++++++++++++ proxy/proxymanager_api.go | 8 +- ui/src/App.tsx | 2 + ui/src/components/ConnectionStatus.tsx | 36 ++++++ ui/src/contexts/APIProvider.tsx | 24 +++- ui/src/pages/Activity.tsx | 27 ++--- 6 files changed, 229 insertions(+), 27 deletions(-) create mode 100644 misc/benchmark-chatcompletion/main.go create mode 100644 ui/src/components/ConnectionStatus.tsx diff --git a/misc/benchmark-chatcompletion/main.go b/misc/benchmark-chatcompletion/main.go new file mode 100644 index 0000000..807a906 --- /dev/null +++ b/misc/benchmark-chatcompletion/main.go @@ -0,0 +1,159 @@ +package main + +// created for issue: #252 https://github.com/mostlygeek/llama-swap/issues/252 +// this simple benchmark tool sends a lot of small chat completion requests to llama-swap +// to make sure all the requests are accounted for. +// +// requests can be sent in parallel, and the tool will report the results. +// usage: go run main.go -baseurl http://localhost:8080/v1 -model llama3 -requests 1000 -par 5 + +import ( + "bytes" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "sync" + "time" +) + +func main() { + // ----- CLI arguments ---------------------------------------------------- + var ( + baseurl string + modelName string + totalRequests int + parallelization int + ) + + flag.StringVar(&baseurl, "baseurl", "http://localhost:8080/v1", "Base URL of the API (e.g., https://api.example.com)") + flag.StringVar(&modelName, "model", "", "Model name to use") + flag.IntVar(&totalRequests, "requests", 1, "Total number of requests to send") + flag.IntVar(¶llelization, "par", 1, "Maximum number of concurrent requests") + flag.Parse() + + if baseurl == "" || modelName == "" { + fmt.Println("Error: both -baseurl and -model are required.") + flag.Usage() + os.Exit(1) + } + if totalRequests <= 0 { + fmt.Println("Error: -requests must be greater than 0.") + os.Exit(1) + } + if parallelization <= 0 { + fmt.Println("Error: -parallelization must be greater than 0.") + os.Exit(1) + } + + // ----- HTTP client ------------------------------------------------------- + client := &http.Client{ + Timeout: 30 * time.Second, + } + + // ----- Tracking response codes ------------------------------------------- + statusCounts := make(map[int]int) // map[statusCode]count + var mu sync.Mutex // protects statusCounts + + // ----- Request queue (buffered channel) ---------------------------------- + requests := make(chan int, 10) // Buffered channel with capacity 10 + + // Goroutine to fill the request queue + go func() { + for i := 0; i < totalRequests; i++ { + requests <- i + 1 + } + close(requests) + }() + + // ----- Worker pool ------------------------------------------------------- + var wg sync.WaitGroup + for i := 0; i < parallelization; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for reqID := range requests { + // Build request payload as a single line JSON string + payload := `{"model":"` + modelName + `","max_tokens":100,"stream":false,"messages":[{"role":"user","content":"write a snake game in python"}]}` + + // Send POST request + req, err := http.NewRequest(http.MethodPost, + fmt.Sprintf("%s/chat/completions", baseurl), + bytes.NewReader([]byte(payload))) + if err != nil { + log.Printf("[worker %d][req %d] request creation error: %v", workerID, reqID, err) + mu.Lock() + statusCounts[-1]++ + mu.Unlock() + continue + } + req.Header.Set("Content-Type", "application/json") + + resp, err := client.Do(req) + if err != nil { + log.Printf("[worker %d][req %d] HTTP request error: %v", workerID, reqID, err) + mu.Lock() + statusCounts[-1]++ + mu.Unlock() + continue + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + // Record status code + mu.Lock() + statusCounts[resp.StatusCode]++ + mu.Unlock() + } + }(i + 1) + } + + // ----- Status ticker (prints every second) ------------------------------- + done := make(chan struct{}) + tickerDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(1 * time.Second) + startTime := time.Now() + for { + select { + case <-ticker.C: + mu.Lock() + // Compute how many requests have completed so far + completed := 0 + for _, cnt := range statusCounts { + completed += cnt + } + // Calculate duration and progress + duration := time.Since(startTime) + progress := completed * 100 / totalRequests + fmt.Printf("Duration: %v, Completed: %d%% requests\n", duration, progress) + mu.Unlock() + case <-done: + duration := time.Since(startTime) + fmt.Printf("Duration: %v, Completed: %d%% requests\n", duration, 100) + close(tickerDone) + return + } + } + }() + + // Wait for all workers to finish + wg.Wait() + close(done) // stops the status-update goroutine + <-tickerDone // give ticker time to finish / print + + // ----- Summary ------------------------------------------------------------ + fmt.Println("\n\n=== HTTP response code summary ===") + mu.Lock() + for code, cnt := range statusCounts { + if code == -1 { + fmt.Printf("Client-side errors (no HTTP response): %d\n", cnt) + } else { + fmt.Printf("%d : %d\n", code, cnt) + } + } + mu.Unlock() +} diff --git a/proxy/proxymanager_api.go b/proxy/proxymanager_api.go index dc96a99..f133e4c 100644 --- a/proxy/proxymanager_api.go +++ b/proxy/proxymanager_api.go @@ -132,7 +132,7 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { } } - sendMetrics := func(metrics TokenMetrics) { + sendMetrics := func(metrics []TokenMetrics) { jsonData, err := json.Marshal(metrics) if err == nil { select { @@ -168,16 +168,14 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { * Send Metrics data */ defer event.On(func(e TokenMetricsEvent) { - sendMetrics(e.Metrics) + sendMetrics([]TokenMetrics{e.Metrics}) })() // send initial batch of data sendLogData("proxy", pm.proxyLogger.GetHistory()) sendLogData("upstream", pm.upstreamLogger.GetHistory()) sendModels() - for _, metrics := range pm.metricsMonitor.GetMetrics() { - sendMetrics(metrics) - } + sendMetrics(pm.metricsMonitor.GetMetrics()) for { select { diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 206d399..85050db 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -4,6 +4,7 @@ import { APIProvider } from "./contexts/APIProvider"; import LogViewerPage from "./pages/LogViewer"; import ModelPage from "./pages/Models"; import ActivityPage from "./pages/Activity"; +import ConnectionStatus from "./components/ConnectionStatus"; import { RiSunFill, RiMoonFill } from "react-icons/ri"; function App() { @@ -31,6 +32,7 @@ function App() { + diff --git a/ui/src/components/ConnectionStatus.tsx b/ui/src/components/ConnectionStatus.tsx new file mode 100644 index 0000000..56a684f --- /dev/null +++ b/ui/src/components/ConnectionStatus.tsx @@ -0,0 +1,36 @@ +import { useAPI } from "../contexts/APIProvider"; +import { useEffect, useState, useMemo } from "react"; + +type ConnectionStatus = "disconnected" | "connecting" | "connected"; + +const ConnectionStatus = () => { + const { getConnectionStatus } = useAPI(); + const [eventStreamStatus, setEventStreamStatus] = useState("disconnected"); + + useEffect(() => { + const interval = setInterval(() => { + setEventStreamStatus(getConnectionStatus()); + }, 1000); + return () => clearInterval(interval); + }); + + const eventStatusColor = useMemo(() => { + switch (eventStreamStatus) { + case "connected": + return "bg-green-500"; + case "connecting": + return "bg-yellow-500"; + case "disconnected": + default: + return "bg-red-500"; + } + }, [eventStreamStatus]); + + return ( +
+ +
+ ); +}; + +export default ConnectionStatus; diff --git a/ui/src/contexts/APIProvider.tsx b/ui/src/contexts/APIProvider.tsx index d2a8a7f..20bd81b 100644 --- a/ui/src/contexts/APIProvider.tsx +++ b/ui/src/contexts/APIProvider.tsx @@ -20,6 +20,7 @@ interface APIProviderType { proxyLogs: string; upstreamLogs: string; metrics: Metrics[]; + getConnectionStatus: () => "connected" | "connecting" | "disconnected"; } interface Metrics { @@ -63,6 +64,16 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider }); }, []); + const getConnectionStatus = useCallback(() => { + if (apiEventSource.current?.readyState === EventSource.OPEN) { + return "connected"; + } else if (apiEventSource.current?.readyState === EventSource.CONNECTING) { + return "connecting"; + } else { + return "disconnected"; + } + }, []); + const enableAPIEvents = useCallback((enabled: boolean) => { if (!enabled) { apiEventSource.current?.close(); @@ -77,6 +88,14 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider const connect = () => { const eventSource = new EventSource("/api/events"); + eventSource.onopen = () => { + // clear everything out on connect to keep things in sync + setProxyLogs(""); + setUpstreamLogs(""); + setMetrics([]); // clear metrics on reconnect + setModels([]); // clear models on reconnect + }; + eventSource.onmessage = (e: MessageEvent) => { try { const message = JSON.parse(e.data) as APIEventEnvelope; @@ -108,9 +127,9 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider case "metrics": { - const newMetric = JSON.parse(message.data) as Metrics; + const newMetrics = JSON.parse(message.data) as Metrics[]; setMetrics((prevMetrics) => { - return [newMetric, ...prevMetrics]; + return [...newMetrics, ...prevMetrics]; }); } break; @@ -194,6 +213,7 @@ export function APIProvider({ children, autoStartAPIEvents = true }: APIProvider proxyLogs, upstreamLogs, metrics, + getConnectionStatus, }), [models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs, metrics] ); diff --git a/ui/src/pages/Activity.tsx b/ui/src/pages/Activity.tsx index f8aa996..de8ab3e 100644 --- a/ui/src/pages/Activity.tsx +++ b/ui/src/pages/Activity.tsx @@ -1,4 +1,4 @@ -import { useState, useEffect } from "react"; +import { useMemo } from "react"; import { useAPI } from "../contexts/APIProvider"; const formatTimestamp = (timestamp: string): string => { @@ -15,25 +15,10 @@ const formatDuration = (ms: number): string => { const ActivityPage = () => { const { metrics } = useAPI(); - const [error, setError] = useState(null); - - useEffect(() => { - if (metrics.length > 0) { - setError(null); - } + const sortedMetrics = useMemo(() => { + return [...metrics].sort((a, b) => b.id - a.id); }, [metrics]); - if (error) { - return ( -
-

Activity

-
-

{error}

-
-
- ); - } - return (

Activity

@@ -47,6 +32,7 @@ const ActivityPage = () => { + @@ -57,8 +43,9 @@ const ActivityPage = () => { - {metrics.map((metric, index) => ( - + {sortedMetrics.map((metric) => ( + +
Id Timestamp Model Input Tokens
{metric.id + 1 /* un-zero index */} {formatTimestamp(metric.timestamp)} {metric.model} {metric.input_tokens.toLocaleString()}