improve error handling

This commit is contained in:
Benson Wong
2024-10-04 10:55:02 -07:00
parent 2d387cf373
commit bfdba43bd8
3 changed files with 86 additions and 24 deletions

View File

@@ -1,7 +1,20 @@
# Seconds to wait for llama.cpp to be available to serve requests
# Default (and minimum): 15 seconds
healthCheckTimeout: 60
models: models:
"llama": "llama":
cmd: "models/llama-server-osx --port 8999 -m models/Llama-3.2-1B-Instruct-Q4_K_M.gguf" cmd: "models/llama-server-osx --port 8999 -m models/Llama-3.2-1B-Instruct-Q4_K_M.gguf"
proxy: "http://127.0.0.1:8999" proxy: "http://127.0.0.1:8999"
# list of model name aliases this llama.cpp instance can serve
aliases:
- "gpt-4o-mini"
"qwen": "qwen":
cmd: "models/llama-server-osx --port 8999 -m models/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf " cmd: "models/llama-server-osx --port 8999 -m models/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf "
proxy: "http://127.0.0.1:8999" proxy: "http://127.0.0.1:8999"
aliases:
- "gpt-3.5-turbo"
"broken":
cmd: "models/llama-server-osx --port 8999 -m models/doesnotexist.gguf "
proxy: "http://127.0.0.1:8999"

View File

@@ -7,12 +7,32 @@ import (
) )
type ModelConfig struct { type ModelConfig struct {
Cmd string `yaml:"cmd"` Cmd string `yaml:"cmd"`
Proxy string `yaml:"proxy"` Proxy string `yaml:"proxy"`
Aliases []string `yaml:"aliases"`
} }
type Config struct { type Config struct {
Models map[string]ModelConfig `yaml:"models"` Models map[string]ModelConfig `yaml:"models"`
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
}
func (c *Config) FindConfig(modelName string) (ModelConfig, bool) {
modelConfig, found := c.Models[modelName]
if found {
return modelConfig, true
}
// Search through aliases to find the right config
for _, config := range c.Models {
for _, alias := range config.Aliases {
if alias == modelName {
return config, true
}
}
}
return ModelConfig{}, false
} }
func LoadConfig(path string) (*Config, error) { func LoadConfig(path string) (*Config, error) {
@@ -27,5 +47,9 @@ func LoadConfig(path string) (*Config, error) {
return nil, err return nil, err
} }
if config.HealthCheckTimeout < 15 {
config.HealthCheckTimeout = 15
}
return &config, nil return &config, nil
} }

View File

@@ -18,10 +18,9 @@ import (
type ProxyManager struct { type ProxyManager struct {
sync.Mutex sync.Mutex
config *Config config *Config
currentCmd *exec.Cmd currentCmd *exec.Cmd
currentModel string currentConfig ModelConfig
currentProxy string
} }
func New(config *Config) *ProxyManager { func New(config *Config) *ProxyManager {
@@ -32,29 +31,31 @@ func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/v1/chat/completions" { if r.URL.Path == "/v1/chat/completions" {
pm.proxyChatRequest(w, r) pm.proxyChatRequest(w, r)
} else { } else {
http.Error(w, "Endpoint not supported", http.StatusNotFound) http.Error(w, "endpoint not supported", http.StatusNotFound)
} }
} }
func (pm *ProxyManager) swapModel(model string) error { func (pm *ProxyManager) swapModel(requestedModel string) error {
pm.Lock() pm.Lock()
defer pm.Unlock() defer pm.Unlock()
if model == pm.currentModel { // find the model configuration matching requestedModel
modelConfig, found := pm.config.FindConfig(requestedModel)
if !found {
return fmt.Errorf("could not find configuration for %s", requestedModel)
}
// no need to swap llama.cpp instances
if pm.currentConfig.Cmd == modelConfig.Cmd {
return nil return nil
} }
modelConfig, ok := pm.config.Models[model] // kill the current running one to swap it
if !ok {
return fmt.Errorf("unknown model %s", model)
}
if pm.currentCmd != nil { if pm.currentCmd != nil {
pm.currentCmd.Process.Signal(syscall.SIGTERM) pm.currentCmd.Process.Signal(syscall.SIGTERM)
} }
pm.currentModel = model pm.currentConfig = modelConfig
pm.currentProxy = modelConfig.Proxy
args := strings.Fields(modelConfig.Cmd) args := strings.Fields(modelConfig.Cmd)
cmd := exec.Command(args[0], args[1:]...) cmd := exec.Command(args[0], args[1:]...)
@@ -66,20 +67,24 @@ func (pm *ProxyManager) swapModel(model string) error {
} }
pm.currentCmd = cmd pm.currentCmd = cmd
if err := pm.checkHealthEndpoint(60 * time.Second); err != nil { if err := pm.checkHealthEndpoint(); err != nil {
return err return err
} }
return nil return nil
} }
func (pm *ProxyManager) checkHealthEndpoint(maxDuration time.Duration) error { func (pm *ProxyManager) checkHealthEndpoint() error {
if pm.currentProxy == "" { if pm.currentConfig.Proxy == "" {
return fmt.Errorf("no upstream available to check /health") return fmt.Errorf("no upstream available to check /health")
} }
healthURL := pm.currentProxy + "/health" proxyTo := pm.currentConfig.Proxy
maxDuration := time.Second * time.Duration(pm.config.HealthCheckTimeout)
healthURL := proxyTo + "/health"
client := &http.Client{} client := &http.Client{}
startTime := time.Now() startTime := time.Now()
@@ -93,6 +98,15 @@ func (pm *ProxyManager) checkHealthEndpoint(maxDuration time.Duration) error {
req = req.WithContext(ctx) req = req.WithContext(ctx)
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// llama.cpp /health endpoint commes up fast, give it 5 seconds
// happens when llama.cpp exited, keeps the code simple if TCP dial is not
// able to talk to the proxy endpoint
if time.Since(startTime) > 5*time.Second {
return fmt.Errorf("/healthy endpoint took more than 5 seconds to respond")
}
}
if time.Since(startTime) >= maxDuration { if time.Since(startTime) >= maxDuration {
return fmt.Errorf("failed to check /healthy from: %s", healthURL) return fmt.Errorf("failed to check /healthy from: %s", healthURL)
} }
@@ -127,14 +141,25 @@ func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request)
return return
} }
pm.swapModel(model) if err := pm.swapModel(model); err != nil {
http.Error(w, fmt.Sprintf("unable to swap to model: %s", err.Error()), http.StatusNotFound)
return
}
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
pm.proxyRequest(w, r) pm.proxyRequest(w, r)
} }
func (pm *ProxyManager) proxyRequest(w http.ResponseWriter, r *http.Request) { func (pm *ProxyManager) proxyRequest(w http.ResponseWriter, r *http.Request) {
if pm.currentConfig.Proxy == "" {
http.Error(w, "No upstream proxy", http.StatusInternalServerError)
return
}
proxyTo := pm.currentConfig.Proxy
client := &http.Client{} client := &http.Client{}
req, err := http.NewRequest(r.Method, pm.currentProxy+r.URL.String(), r.Body) req, err := http.NewRequest(r.Method, proxyTo+r.URL.String(), r.Body)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return