Files
llama-swap/proxy/proxymanager_api.go
Benson Wong e250e71e59 Include metrics from upstream chat requests (#361)
* proxy: refactor metrics recording

- remove metrics_middleware.go as this wrapper is no longer needed. This
  also eliminiates double body parsing for the modelID
- move metrics parsing to be part of MetricsMonitor
- refactor how metrics are recording in ProxyManager
- add MetricsMonitor tests
- improve mem efficiency of processStreamingResponse
- add benchmarks for MetricsMonitor.addMetrics
- proxy: refactor MetricsMonitor to be more safe handling errors
2025-10-25 17:38:18 -07:00

230 lines
5.4 KiB
Go

package proxy
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"strings"
"github.com/gin-gonic/gin"
"github.com/mostlygeek/llama-swap/event"
)
type Model struct {
Id string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
State string `json:"state"`
Unlisted bool `json:"unlisted"`
}
func addApiHandlers(pm *ProxyManager) {
// Add API endpoints for React to consume
apiGroup := pm.ginEngine.Group("/api")
{
apiGroup.POST("/models/unload", pm.apiUnloadAllModels)
apiGroup.POST("/models/unload/*model", pm.apiUnloadSingleModelHandler)
apiGroup.GET("/events", pm.apiSendEvents)
apiGroup.GET("/metrics", pm.apiGetMetrics)
}
}
func (pm *ProxyManager) apiUnloadAllModels(c *gin.Context) {
pm.StopProcesses(StopImmediately)
c.JSON(http.StatusOK, gin.H{"msg": "ok"})
}
func (pm *ProxyManager) getModelStatus() []Model {
// Extract keys and sort them
models := []Model{}
modelIDs := make([]string, 0, len(pm.config.Models))
for modelID := range pm.config.Models {
modelIDs = append(modelIDs, modelID)
}
sort.Strings(modelIDs)
// Iterate over sorted keys
for _, modelID := range modelIDs {
// Get process state
processGroup := pm.findGroupByModelName(modelID)
state := "unknown"
if processGroup != nil {
process := processGroup.processes[modelID]
if process != nil {
var stateStr string
switch process.CurrentState() {
case StateReady:
stateStr = "ready"
case StateStarting:
stateStr = "starting"
case StateStopping:
stateStr = "stopping"
case StateShutdown:
stateStr = "shutdown"
case StateStopped:
stateStr = "stopped"
default:
stateStr = "unknown"
}
state = stateStr
}
}
models = append(models, Model{
Id: modelID,
Name: pm.config.Models[modelID].Name,
Description: pm.config.Models[modelID].Description,
State: state,
Unlisted: pm.config.Models[modelID].Unlisted,
})
}
return models
}
type messageType string
const (
msgTypeModelStatus messageType = "modelStatus"
msgTypeLogData messageType = "logData"
msgTypeMetrics messageType = "metrics"
)
type messageEnvelope struct {
Type messageType `json:"type"`
Data string `json:"data"`
}
// sends a stream of different message types that happen on the server
func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("X-Content-Type-Options", "nosniff")
// prevent nginx from buffering SSE
c.Header("X-Accel-Buffering", "no")
sendBuffer := make(chan messageEnvelope, 25)
ctx, cancel := context.WithCancel(c.Request.Context())
sendModels := func() {
data, err := json.Marshal(pm.getModelStatus())
if err == nil {
msg := messageEnvelope{Type: msgTypeModelStatus, Data: string(data)}
select {
case sendBuffer <- msg:
case <-ctx.Done():
return
default:
}
}
}
sendLogData := func(source string, data []byte) {
data, err := json.Marshal(gin.H{
"source": source,
"data": string(data),
})
if err == nil {
select {
case sendBuffer <- messageEnvelope{Type: msgTypeLogData, Data: string(data)}:
case <-ctx.Done():
return
default:
}
}
}
sendMetrics := func(metrics []TokenMetrics) {
jsonData, err := json.Marshal(metrics)
if err == nil {
select {
case sendBuffer <- messageEnvelope{Type: msgTypeMetrics, Data: string(jsonData)}:
case <-ctx.Done():
return
default:
}
}
}
/**
* Send updated models list
*/
defer event.On(func(e ProcessStateChangeEvent) {
sendModels()
})()
defer event.On(func(e ConfigFileChangedEvent) {
sendModels()
})()
/**
* Send Log data
*/
defer pm.proxyLogger.OnLogData(func(data []byte) {
sendLogData("proxy", data)
})()
defer pm.upstreamLogger.OnLogData(func(data []byte) {
sendLogData("upstream", data)
})()
/**
* Send Metrics data
*/
defer event.On(func(e TokenMetricsEvent) {
sendMetrics([]TokenMetrics{e.Metrics})
})()
// send initial batch of data
sendLogData("proxy", pm.proxyLogger.GetHistory())
sendLogData("upstream", pm.upstreamLogger.GetHistory())
sendModels()
sendMetrics(pm.metricsMonitor.getMetrics())
for {
select {
case <-c.Request.Context().Done():
cancel()
return
case <-pm.shutdownCtx.Done():
cancel()
return
case msg := <-sendBuffer:
c.SSEvent("message", msg)
c.Writer.Flush()
}
}
}
func (pm *ProxyManager) apiGetMetrics(c *gin.Context) {
jsonData, err := pm.metricsMonitor.getMetricsJSON()
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get metrics"})
return
}
c.Data(http.StatusOK, "application/json", jsonData)
}
func (pm *ProxyManager) apiUnloadSingleModelHandler(c *gin.Context) {
requestedModel := strings.TrimPrefix(c.Param("model"), "/")
realModelName, found := pm.config.RealModelName(requestedModel)
if !found {
pm.sendErrorResponse(c, http.StatusNotFound, "Model not found")
return
}
processGroup := pm.findGroupByModelName(realModelName)
if processGroup == nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("process group not found for model %s", requestedModel))
return
}
if err := processGroup.StopProcess(realModelName, StopImmediately); err != nil {
pm.sendErrorResponse(c, http.StatusInternalServerError, fmt.Sprintf("error stopping process: %s", err.Error()))
return
} else {
c.String(http.StatusOK, "OK")
}
}