diff --git a/config.example.yaml b/config.example.yaml index 3090799..806c500 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -15,6 +15,12 @@ healthCheckTimeout: 500 # - Valid log levels: debug, info, warn, error logLevel: info +# metricsMaxInMemory: maximum number of metrics to keep in memory +# - optional, default: 1000 +# - controls how many metrics are stored in memory before older ones are discarded +# - useful for limiting memory usage when processing large volumes of metrics +metricsMaxInMemory: 1000 + # startPort: sets the starting port number for the automatic ${PORT} macro. # - optional, default: 5800 # - the ${PORT} macro can be used in model.cmd and model.proxy settings @@ -200,4 +206,4 @@ groups: members: - "forever-modelA" - "forever-modelB" - - "forever-modelc" \ No newline at end of file + - "forever-modelc" diff --git a/docker/config.example.yaml b/docker/config.example.yaml index b1ee018..9f5f23b 100644 --- a/docker/config.example.yaml +++ b/docker/config.example.yaml @@ -1,5 +1,6 @@ healthCheckTimeout: 300 logRequests: true +metricsMaxInMemory: 1000 models: "qwen2.5": diff --git a/misc/simple-responder/simple-responder.go b/misc/simple-responder/simple-responder.go index d0198cc..228c946 100644 --- a/misc/simple-responder/simple-responder.go +++ b/misc/simple-responder/simple-responder.go @@ -35,20 +35,75 @@ func main() { // Set up the handler function using the provided response message r.POST("/v1/chat/completions", func(c *gin.Context) { - c.Header("Content-Type", "application/json") - - // add a wait to simulate a slow query - if wait, err := time.ParseDuration(c.Query("wait")); err == nil { - time.Sleep(wait) - } - bodyBytes, _ := io.ReadAll(c.Request.Body) - c.JSON(http.StatusOK, gin.H{ - "responseMessage": *responseMessage, - "h_content_length": c.Request.Header.Get("Content-Length"), - "request_body": string(bodyBytes), - }) + // Check if streaming is requested + // Query is checked instead of JSON body since that event stream conflicts with other tests + isStreaming := c.Query("stream") == "true" + + if isStreaming { + // Set headers for streaming + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("Transfer-Encoding", "chunked") + + // add a wait to simulate a slow query + if wait, err := time.ParseDuration(c.Query("wait")); err == nil { + time.Sleep(wait) + } + + // Send 10 "asdf" tokens + for i := 0; i < 10; i++ { + data := gin.H{ + "created": time.Now().Unix(), + "choices": []gin.H{ + { + "index": 0, + "delta": gin.H{ + "content": "asdf", + }, + "finish_reason": nil, + }, + }, + } + c.SSEvent("message", data) + c.Writer.Flush() + } + + // Send final data with usage info + finalData := gin.H{ + "usage": gin.H{ + "completion_tokens": 10, + "prompt_tokens": 25, + "total_tokens": 35, + }, + } + c.SSEvent("message", finalData) + c.Writer.Flush() + + // Send [DONE] + c.SSEvent("message", "[DONE]") + c.Writer.Flush() + } else { + c.Header("Content-Type", "application/json") + + // add a wait to simulate a slow query + if wait, err := time.ParseDuration(c.Query("wait")); err == nil { + time.Sleep(wait) + } + + c.JSON(http.StatusOK, gin.H{ + "responseMessage": *responseMessage, + "h_content_length": c.Request.Header.Get("Content-Length"), + "request_body": string(bodyBytes), + "usage": gin.H{ + "completion_tokens": 10, + "prompt_tokens": 25, + "total_tokens": 35, + }, + }) + } }) // for issue #62 to check model name strips profile slug @@ -74,6 +129,11 @@ func main() { c.Header("Content-Type", "application/json") c.JSON(http.StatusOK, gin.H{ "responseMessage": *responseMessage, + "usage": gin.H{ + "completion_tokens": 10, + "prompt_tokens": 25, + "total_tokens": 35, + }, }) }) diff --git a/proxy/config.go b/proxy/config.go index e019422..ee72747 100644 --- a/proxy/config.go +++ b/proxy/config.go @@ -142,6 +142,7 @@ type Config struct { HealthCheckTimeout int `yaml:"healthCheckTimeout"` LogRequests bool `yaml:"logRequests"` LogLevel string `yaml:"logLevel"` + MetricsMaxInMemory int `yaml:"metricsMaxInMemory"` Models map[string]ModelConfig `yaml:"models"` /* key is model ID */ Profiles map[string][]string `yaml:"profiles"` Groups map[string]GroupConfig `yaml:"groups"` /* key is group ID */ @@ -194,6 +195,7 @@ func LoadConfigFromReader(r io.Reader) (Config, error) { HealthCheckTimeout: 120, StartPort: 5800, LogLevel: "info", + MetricsMaxInMemory: 1000, } err = yaml.Unmarshal(data, &config) if err != nil { diff --git a/proxy/config_posix_test.go b/proxy/config_posix_test.go index 91d12a0..da49997 100644 --- a/proxy/config_posix_test.go +++ b/proxy/config_posix_test.go @@ -196,6 +196,7 @@ groups: }, }, HealthCheckTimeout: 15, + MetricsMaxInMemory: 1000, Profiles: map[string][]string{ "test": {"model1", "model2"}, }, diff --git a/proxy/config_windows_test.go b/proxy/config_windows_test.go index 2f3fd30..6902da0 100644 --- a/proxy/config_windows_test.go +++ b/proxy/config_windows_test.go @@ -193,6 +193,7 @@ groups: }, }, HealthCheckTimeout: 15, + MetricsMaxInMemory: 1000, Profiles: map[string][]string{ "test": {"model1", "model2"}, }, diff --git a/proxy/events.go b/proxy/events.go index f6d4a61..9a4d5fb 100644 --- a/proxy/events.go +++ b/proxy/events.go @@ -6,6 +6,7 @@ const ProcessStateChangeEventID = 0x01 const ChatCompletionStatsEventID = 0x02 const ConfigFileChangedEventID = 0x03 const LogDataEventID = 0x04 +const TokenMetricsEventID = 0x05 type ProcessStateChangeEvent struct { ProcessName string diff --git a/proxy/metrics_middleware.go b/proxy/metrics_middleware.go new file mode 100644 index 0000000..7626f36 --- /dev/null +++ b/proxy/metrics_middleware.go @@ -0,0 +1,153 @@ +package proxy + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" + + "github.com/gin-gonic/gin" + "github.com/tidwall/gjson" +) + +// MetricsMiddleware sets up the MetricsResponseWriter for capturing upstream requests +func MetricsMiddleware(pm *ProxyManager) gin.HandlerFunc { + return func(c *gin.Context) { + bodyBytes, err := io.ReadAll(c.Request.Body) + if err != nil { + pm.sendErrorResponse(c, http.StatusBadRequest, "could not ready request body") + return + } + c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + requestedModel := gjson.GetBytes(bodyBytes, "model").String() + if requestedModel == "" { + pm.sendErrorResponse(c, http.StatusBadRequest, "missing or invalid 'model' key") + return + } + + realModelName, found := pm.config.RealModelName(requestedModel) + if !found { + pm.sendErrorResponse(c, http.StatusBadRequest, fmt.Sprintf("could not find real modelID for %s", requestedModel)) + return + } + c.Set("ls-real-model-name", realModelName) + + writer := &MetricsResponseWriter{ + ResponseWriter: c.Writer, + metricsRecorder: &MetricsRecorder{ + metricsMonitor: pm.metricsMonitor, + realModelName: realModelName, + isStreaming: gjson.GetBytes(bodyBytes, "stream").Bool(), + startTime: time.Now(), + }, + } + c.Writer = writer + c.Next() + + rec := writer.metricsRecorder + rec.processBody(writer.body) + } +} + +type MetricsRecorder struct { + metricsMonitor *MetricsMonitor + realModelName string + isStreaming bool + startTime time.Time +} + +// processBody handles response processing after request completes +func (rec *MetricsRecorder) processBody(body []byte) { + if rec.isStreaming { + rec.processStreamingResponse(body) + } else { + rec.processNonStreamingResponse(body) + } +} + +func (rec *MetricsRecorder) parseAndRecordMetrics(jsonData gjson.Result) { + if !jsonData.Get("usage").Exists() { + return + } + + outputTokens := int(jsonData.Get("usage.completion_tokens").Int()) + inputTokens := int(jsonData.Get("usage.prompt_tokens").Int()) + + if outputTokens > 0 { + duration := time.Since(rec.startTime) + tokensPerSecond := float64(inputTokens+outputTokens) / duration.Seconds() + + metrics := TokenMetrics{ + Timestamp: time.Now(), + Model: rec.realModelName, + InputTokens: inputTokens, + OutputTokens: outputTokens, + TokensPerSecond: tokensPerSecond, + DurationMs: int(duration.Milliseconds()), + } + rec.metricsMonitor.addMetrics(metrics) + } +} + +func (rec *MetricsRecorder) processStreamingResponse(body []byte) { + lines := bytes.Split(body, []byte("\n")) + for _, line := range lines { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + + // Check for SSE data prefix + if data, found := bytes.CutPrefix(line, []byte("data:")); found { + data = bytes.TrimSpace(data) + if len(data) == 0 { + continue + } + if bytes.Equal(data, []byte("[DONE]")) { + break + } + + // Parse JSON to look for usage data + if gjson.ValidBytes(data) { + rec.parseAndRecordMetrics(gjson.ParseBytes(data)) + } + } + } +} + +func (rec *MetricsRecorder) processNonStreamingResponse(body []byte) { + if len(body) == 0 { + return + } + + // Parse JSON to extract usage information + if gjson.ValidBytes(body) { + rec.parseAndRecordMetrics(gjson.ParseBytes(body)) + } +} + +// MetricsResponseWriter captures the entire response for non-streaming +type MetricsResponseWriter struct { + gin.ResponseWriter + body []byte + metricsRecorder *MetricsRecorder +} + +func (w *MetricsResponseWriter) Write(b []byte) (int, error) { + n, err := w.ResponseWriter.Write(b) + if err != nil { + return n, err + } + w.body = append(w.body, b...) + return n, nil +} + +func (w *MetricsResponseWriter) WriteHeader(statusCode int) { + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *MetricsResponseWriter) Header() http.Header { + return w.ResponseWriter.Header() +} diff --git a/proxy/metrics_monitor.go b/proxy/metrics_monitor.go new file mode 100644 index 0000000..050b95e --- /dev/null +++ b/proxy/metrics_monitor.go @@ -0,0 +1,82 @@ +package proxy + +import ( + "encoding/json" + "sync" + "time" + + "github.com/mostlygeek/llama-swap/event" +) + +// TokenMetrics represents parsed token statistics from llama-server logs +type TokenMetrics struct { + ID int `json:"id"` + Timestamp time.Time `json:"timestamp"` + Model string `json:"model"` + InputTokens int `json:"input_tokens"` + OutputTokens int `json:"output_tokens"` + TokensPerSecond float64 `json:"tokens_per_second"` + DurationMs int `json:"duration_ms"` +} + +// TokenMetricsEvent represents a token metrics event +type TokenMetricsEvent struct { + Metrics TokenMetrics +} + +func (e TokenMetricsEvent) Type() uint32 { + return TokenMetricsEventID // defined in events.go +} + +// MetricsMonitor parses llama-server output for token statistics +type MetricsMonitor struct { + mu sync.RWMutex + metrics []TokenMetrics + maxMetrics int + nextID int +} + +func NewMetricsMonitor(config *Config) *MetricsMonitor { + maxMetrics := config.MetricsMaxInMemory + if maxMetrics <= 0 { + maxMetrics = 1000 // Default fallback + } + + mp := &MetricsMonitor{ + maxMetrics: maxMetrics, + } + + return mp +} + +// addMetrics adds a new metric to the collection and publishes an event +func (mp *MetricsMonitor) addMetrics(metric TokenMetrics) { + mp.mu.Lock() + defer mp.mu.Unlock() + + metric.ID = mp.nextID + mp.nextID++ + mp.metrics = append(mp.metrics, metric) + if len(mp.metrics) > mp.maxMetrics { + mp.metrics = mp.metrics[len(mp.metrics)-mp.maxMetrics:] + } + + event.Emit(TokenMetricsEvent{Metrics: metric}) +} + +// GetMetrics returns a copy of the current metrics +func (mp *MetricsMonitor) GetMetrics() []TokenMetrics { + mp.mu.RLock() + defer mp.mu.RUnlock() + + result := make([]TokenMetrics, len(mp.metrics)) + copy(result, mp.metrics) + return result +} + +// GetMetricsJSON returns metrics as JSON +func (mp *MetricsMonitor) GetMetricsJSON() ([]byte, error) { + mp.mu.RLock() + defer mp.mu.RUnlock() + return json.Marshal(mp.metrics) +} diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index 8a50a70..ba240d1 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -14,7 +14,6 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -33,6 +32,8 @@ type ProxyManager struct { upstreamLogger *LogMonitor muxLogger *LogMonitor + metricsMonitor *MetricsMonitor + processGroups map[string]*ProcessGroup // shutdown signaling @@ -78,6 +79,8 @@ func New(config Config) *ProxyManager { muxLogger: stdoutLogger, upstreamLogger: upstreamLogger, + metricsMonitor: NewMetricsMonitor(&config), + processGroups: make(map[string]*ProcessGroup), shutdownCtx: shutdownCtx, @@ -149,10 +152,12 @@ func (pm *ProxyManager) setupGinEngine() { c.Next() }) + mm := MetricsMiddleware(pm) + // Set up routes using the Gin engine - pm.ginEngine.POST("/v1/chat/completions", pm.proxyOAIHandler) + pm.ginEngine.POST("/v1/chat/completions", mm, pm.proxyOAIHandler) // Support legacy /v1/completions api, see issue #12 - pm.ginEngine.POST("/v1/completions", pm.proxyOAIHandler) + pm.ginEngine.POST("/v1/completions", mm, pm.proxyOAIHandler) // Support embeddings pm.ginEngine.POST("/v1/embeddings", pm.proxyOAIHandler) @@ -360,13 +365,8 @@ func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) { return } - requestedModel := gjson.GetBytes(bodyBytes, "model").String() - if requestedModel == "" { - pm.sendErrorResponse(c, http.StatusBadRequest, "missing or invalid 'model' key") - return - } - - processGroup, realModelName, err := pm.swapProcessGroup(requestedModel) + realModelName := c.GetString("ls-real-model-name") // Should be set in MetricsMiddleware + processGroup, _, err := pm.swapProcessGroup(realModelName) if err != nil { pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error swapping process group: %s", err.Error())) return diff --git a/proxy/proxymanager_api.go b/proxy/proxymanager_api.go index 991e8e2..dc96a99 100644 --- a/proxy/proxymanager_api.go +++ b/proxy/proxymanager_api.go @@ -24,6 +24,7 @@ func addApiHandlers(pm *ProxyManager) { { apiGroup.POST("/models/unload", pm.apiUnloadAllModels) apiGroup.GET("/events", pm.apiSendEvents) + apiGroup.GET("/metrics", pm.apiGetMetrics) } } @@ -85,6 +86,7 @@ type messageType string const ( msgTypeModelStatus messageType = "modelStatus" msgTypeLogData messageType = "logData" + msgTypeMetrics messageType = "metrics" ) type messageEnvelope struct { @@ -130,6 +132,18 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { } } + sendMetrics := func(metrics TokenMetrics) { + jsonData, err := json.Marshal(metrics) + if err == nil { + select { + case sendBuffer <- messageEnvelope{Type: msgTypeMetrics, Data: string(jsonData)}: + case <-ctx.Done(): + return + default: + } + } + } + /** * Send updated models list */ @@ -150,10 +164,20 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { sendLogData("upstream", data) })() + /** + * Send Metrics data + */ + defer event.On(func(e TokenMetricsEvent) { + sendMetrics(e.Metrics) + })() + // send initial batch of data sendLogData("proxy", pm.proxyLogger.GetHistory()) sendLogData("upstream", pm.upstreamLogger.GetHistory()) sendModels() + for _, metrics := range pm.metricsMonitor.GetMetrics() { + sendMetrics(metrics) + } for { select { @@ -169,3 +193,12 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) { } } } + +func (pm *ProxyManager) apiGetMetrics(c *gin.Context) { + jsonData, err := pm.metricsMonitor.GetMetricsJSON() + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get metrics"}) + return + } + c.Data(http.StatusOK, "application/json", jsonData) +} diff --git a/proxy/proxymanager_test.go b/proxy/proxymanager_test.go index 47397f2..363551d 100644 --- a/proxy/proxymanager_test.go +++ b/proxy/proxymanager_test.go @@ -165,9 +165,11 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) { } mu.Lock() - var response map[string]string + var response map[string]interface{} assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) - results[key] = response["responseMessage"] + result, ok := response["responseMessage"].(string) + assert.Equal(t, ok, true) + results[key] = result mu.Unlock() }(key) @@ -644,7 +646,7 @@ func TestProxyManager_ChatContentLength(t *testing.T) { proxy.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) - var response map[string]string + var response map[string]interface{} assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) assert.Equal(t, "81", response["h_content_length"]) assert.Equal(t, "model1", response["responseMessage"]) @@ -672,7 +674,7 @@ func TestProxyManager_FiltersStripParams(t *testing.T) { proxy.ServeHTTP(w, req) assert.Equal(t, http.StatusOK, w.Code) - var response map[string]string + var response map[string]interface{} assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response)) // `temperature` and `stream` are gone but model remains @@ -683,3 +685,69 @@ func TestProxyManager_FiltersStripParams(t *testing.T) { // assert.Equal(t, "abc", response["y_param"]) // t.Logf("%v", response) } + +func TestProxyManager_MiddlewareWritesMetrics_NonStreaming(t *testing.T) { + config := AddDefaultGroupToConfig(Config{ + HealthCheckTimeout: 15, + Models: map[string]ModelConfig{ + "model1": getTestSimpleResponderConfig("model1"), + }, + LogLevel: "error", + }) + + proxy := New(config) + defer proxy.StopProcesses(StopWaitForInflightRequest) + + // Make a non-streaming request + reqBody := `{"model":"model1", "stream": false}` + req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody)) + w := httptest.NewRecorder() + + proxy.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + + // Check that metrics were recorded + metrics := proxy.metricsMonitor.GetMetrics() + assert.NotEmpty(t, metrics, "metrics should be recorded for non-streaming request") + + // Verify the last metric has the correct model + lastMetric := metrics[len(metrics)-1] + assert.Equal(t, "model1", lastMetric.Model) + assert.Equal(t, 25, lastMetric.InputTokens, "input tokens should be 25") + assert.Equal(t, 10, lastMetric.OutputTokens, "output tokens should be 10") + assert.Greater(t, lastMetric.TokensPerSecond, 0.0, "tokens per second should be greater than 0") + assert.Greater(t, lastMetric.DurationMs, 0, "duration should be greater than 0") +} + +func TestProxyManager_MiddlewareWritesMetrics_Streaming(t *testing.T) { + config := AddDefaultGroupToConfig(Config{ + HealthCheckTimeout: 15, + Models: map[string]ModelConfig{ + "model1": getTestSimpleResponderConfig("model1"), + }, + LogLevel: "error", + }) + + proxy := New(config) + defer proxy.StopProcesses(StopWaitForInflightRequest) + + // Make a streaming request + reqBody := `{"model":"model1", "stream": true}` + req := httptest.NewRequest("POST", "/v1/chat/completions?stream=true", bytes.NewBufferString(reqBody)) + w := httptest.NewRecorder() + + proxy.ServeHTTP(w, req) + assert.Equal(t, http.StatusOK, w.Code) + + // Check that metrics were recorded + metrics := proxy.metricsMonitor.GetMetrics() + assert.NotEmpty(t, metrics, "metrics should be recorded for streaming request") + + // Verify the last metric has the correct model + lastMetric := metrics[len(metrics)-1] + assert.Equal(t, "model1", lastMetric.Model) + assert.Equal(t, 25, lastMetric.InputTokens, "input tokens should be 25") + assert.Equal(t, 10, lastMetric.OutputTokens, "output tokens should be 10") + assert.Greater(t, lastMetric.TokensPerSecond, 0.0, "tokens per second should be greater than 0") + assert.Greater(t, lastMetric.DurationMs, 0, "duration should be greater than 0") +} diff --git a/ui/src/App.tsx b/ui/src/App.tsx index 2194260..f36c56c 100644 --- a/ui/src/App.tsx +++ b/ui/src/App.tsx @@ -3,6 +3,7 @@ import { useTheme } from "./contexts/ThemeProvider"; import { APIProvider } from "./contexts/APIProvider"; import LogViewerPage from "./pages/LogViewer"; import ModelPage from "./pages/Models"; +import ActivityPage from "./pages/Activity"; function App() { const theme = useTheme(); @@ -21,6 +22,10 @@ function App() { (isActive ? "navlink active" : "navlink")}> Models + + (isActive ? "navlink active" : "navlink")}> + Activity + @@ -32,6 +37,7 @@ function App() { } /> } /> + } /> } /> diff --git a/ui/src/contexts/APIProvider.tsx b/ui/src/contexts/APIProvider.tsx index ec70a89..9344359 100644 --- a/ui/src/contexts/APIProvider.tsx +++ b/ui/src/contexts/APIProvider.tsx @@ -19,13 +19,25 @@ interface APIProviderType { enableAPIEvents: (enabled: boolean) => void; proxyLogs: string; upstreamLogs: string; + metrics: Metrics[]; } + +interface Metrics { + id: number; + timestamp: string; + model: string; + input_tokens: number; + output_tokens: number; + tokens_per_second: number; + duration_ms: number; +} + interface LogData { source: "upstream" | "proxy"; data: string; } interface APIEventEnvelope { - type: "modelStatus" | "logData"; + type: "modelStatus" | "logData" | "metrics"; data: string; } @@ -37,6 +49,7 @@ type APIProviderProps = { export function APIProvider({ children }: APIProviderProps) { const [proxyLogs, setProxyLogs] = useState(""); const [upstreamLogs, setUpstreamLogs] = useState(""); + const [metrics, setMetrics] = useState([]); const apiEventSource = useRef(null); const [models, setModels] = useState([]); @@ -73,7 +86,7 @@ export function APIProvider({ children }: APIProviderProps) { } break; - case "logData": { + case "logData": const logData = JSON.parse(message.data) as LogData; switch (logData.source) { case "proxy": @@ -83,7 +96,16 @@ export function APIProvider({ children }: APIProviderProps) { appendLog(logData.data, setUpstreamLogs); break; } - } + break; + + case "metrics": + { + const newMetric = JSON.parse(message.data) as Metrics; + setMetrics(prevMetrics => { + return [newMetric, ...prevMetrics]; + }); + } + break; } } catch (err) { console.error(e.data, err); @@ -159,8 +181,9 @@ export function APIProvider({ children }: APIProviderProps) { enableAPIEvents, proxyLogs, upstreamLogs, + metrics, }), - [models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs] + [models, listModels, unloadAllModels, loadModel, enableAPIEvents, proxyLogs, upstreamLogs, metrics] ); return {children}; diff --git a/ui/src/pages/Activity.tsx b/ui/src/pages/Activity.tsx new file mode 100644 index 0000000..0f4832e --- /dev/null +++ b/ui/src/pages/Activity.tsx @@ -0,0 +1,108 @@ +import { useState, useEffect } from 'react'; +import { useAPI } from '../contexts/APIProvider'; + +const ActivityPage = () => { + const { metrics, enableAPIEvents } = useAPI(); + const [error, setError] = useState(null); + + useEffect(() => { + enableAPIEvents(true); + return () => { + enableAPIEvents(false); + }; + }, []); + + useEffect(() => { + if (metrics.length > 0) { + setError(null); + } + }, [metrics]); + + const formatTimestamp = (timestamp: string) => { + return new Date(timestamp).toLocaleString(); + }; + + const formatSpeed = (speed: number) => { + return speed.toFixed(2) + ' t/s'; + }; + + const formatDuration = (ms: number) => { + return (ms / 1000).toFixed(2) + 's'; + }; + + if (error) { + return ( +
+

Activity

+
+

{error}

+
+
+ ); + } + + return ( +
+

Activity

+ + {metrics.length === 0 ? ( +
+

No metrics data available

+
+ ) : ( +
+ + + + + + + + + + + + + {metrics.map((metric, index) => ( + + + + + + + + + ))} + +
+ Timestamp + + Model + + Input Tokens + + Output Tokens + + Processing Speed + + Duration +
+ {formatTimestamp(metric.timestamp)} + + {metric.model} + + {metric.input_tokens.toLocaleString()} + + {metric.output_tokens.toLocaleString()} + + {formatSpeed(metric.tokens_per_second)} + + {formatDuration(metric.duration_ms)} +
+
+ )} +
+ ); +}; + +export default ActivityPage;