Implement Multi-Process Handling (#7)

Refactor code to support starting of multiple back end llama.cpp servers. This functionality is exposed as `profiles` to create a simple configuration format. 

Changes: 

* refactor proxy tests to get ready for multi-process support
* update proxy/ProxyManager to support multiple processes (#7)
* Add support for Groups in configuration
* improve handling of Model alias configs
* implement multi-model swapping
* improve code clarity for swapModel
* improve docs, rename groups to profiles in config
This commit is contained in:
Benson Wong
2024-11-23 19:45:13 -08:00
committed by GitHub
parent 533162ce6a
commit 73ad85ea69
10 changed files with 361 additions and 124 deletions

View File

@@ -7,6 +7,7 @@ import (
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
@@ -16,18 +17,18 @@ import (
type ProxyManager struct {
sync.Mutex
config *Config
currentProcess *Process
logMonitor *LogMonitor
ginEngine *gin.Engine
config *Config
currentProcesses map[string]*Process
logMonitor *LogMonitor
ginEngine *gin.Engine
}
func New(config *Config) *ProxyManager {
pm := &ProxyManager{
config: config,
currentProcess: nil,
logMonitor: NewLogMonitor(),
ginEngine: gin.New(),
config: config,
currentProcesses: make(map[string]*Process),
logMonitor: NewLogMonitor(),
ginEngine: gin.New(),
}
// Set up routes using the Gin engine
@@ -43,7 +44,7 @@ func New(config *Config) *ProxyManager {
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
pm.ginEngine.NoRoute(pm.proxyRequestHandler)
pm.ginEngine.NoRoute(pm.proxyNoRouteHandler)
// Disable console color for testing
gin.DisableConsoleColor()
@@ -59,6 +60,21 @@ func (pm *ProxyManager) HandlerFunc(w http.ResponseWriter, r *http.Request) {
pm.ginEngine.ServeHTTP(w, r)
}
func (pm *ProxyManager) StopProcesses() {
pm.Lock()
defer pm.Unlock()
pm.stopProcesses()
}
// for internal usage
func (pm *ProxyManager) stopProcesses() {
for _, process := range pm.currentProcesses {
process.Stop()
}
pm.currentProcesses = make(map[string]*Process)
}
func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
data := []interface{}{}
for id := range pm.config.Models {
@@ -80,27 +96,64 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
}
}
func (pm *ProxyManager) swapModel(requestedModel string) error {
func (pm *ProxyManager) swapModel(requestedModel string) (*Process, error) {
pm.Lock()
defer pm.Unlock()
// find the model configuration matching requestedModel
modelConfig, modelID, found := pm.config.FindConfig(requestedModel)
if !found {
return fmt.Errorf("could not find configuration for %s", requestedModel)
// Check if requestedModel contains a /
groupName, modelName := "", requestedModel
if idx := strings.Index(requestedModel, "/"); idx != -1 {
groupName = requestedModel[:idx]
modelName = requestedModel[idx+1:]
}
// do nothing as it's already the correct process
if pm.currentProcess != nil {
if pm.currentProcess.ID == modelID {
return nil
} else {
pm.currentProcess.Stop()
if groupName != "" {
if _, found := pm.config.Profiles[groupName]; !found {
return nil, fmt.Errorf("model group not found %s", groupName)
}
}
pm.currentProcess = NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor)
return nil
// de-alias the real model name and get a real one
realModelName, found := pm.config.RealModelName(modelName)
if !found {
return nil, fmt.Errorf("could not find modelID for %s", requestedModel)
}
// exit early when already running, otherwise stop everything and swap
requestedProcessKey := groupName + "/" + realModelName
if process, found := pm.currentProcesses[requestedProcessKey]; found {
return process, nil
}
// stop all running models
pm.stopProcesses()
if groupName == "" {
modelConfig, modelID, found := pm.config.FindConfig(realModelName)
if !found {
return nil, fmt.Errorf("could not find configuration for %s", realModelName)
}
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor)
processKey := groupName + "/" + modelID
pm.currentProcesses[processKey] = process
} else {
for _, modelName := range pm.config.Profiles[groupName] {
if realModelName, found := pm.config.RealModelName(modelName); found {
modelConfig, modelID, found := pm.config.FindConfig(realModelName)
if !found {
return nil, fmt.Errorf("could not find configuration for %s in group %s", realModelName, groupName)
}
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor)
processKey := groupName + "/" + modelID
pm.currentProcesses[processKey] = process
}
}
}
// requestedProcessKey should exist due to swap
return pm.currentProcesses[requestedProcessKey], nil
}
func (pm *ProxyManager) proxyChatRequestHandler(c *gin.Context) {
@@ -120,24 +173,27 @@ func (pm *ProxyManager) proxyChatRequestHandler(c *gin.Context) {
return
}
if err := pm.swapModel(model); err != nil {
if process, err := pm.swapModel(model); err != nil {
c.AbortWithError(http.StatusNotFound, fmt.Errorf("unable to swap to model, %s", err.Error()))
return
} else {
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// dechunk it as we already have all the body bytes see issue #11
c.Request.Header.Del("transfer-encoding")
c.Request.Header.Add("content-length", strconv.Itoa(len(bodyBytes)))
process.ProxyRequest(c.Writer, c.Request)
}
}
func (pm *ProxyManager) proxyNoRouteHandler(c *gin.Context) {
// since maps are unordered, just use the first available process if one exists
for _, process := range pm.currentProcesses {
process.ProxyRequest(c.Writer, c.Request)
return
}
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// dechunk it as we already have all the body bytes see issue #11
c.Request.Header.Del("transfer-encoding")
c.Request.Header.Add("content-length", strconv.Itoa(len(bodyBytes)))
pm.currentProcess.ProxyRequest(c.Writer, c.Request)
}
func (pm *ProxyManager) proxyRequestHandler(c *gin.Context) {
if pm.currentProcess != nil {
pm.currentProcess.ProxyRequest(c.Writer, c.Request)
} else {
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("no strategy to handle request"))
}
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("no strategy to handle request"))
}