From 34f9fd73408a2637a238ab4b58a7976005b09f91 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Fri, 1 Nov 2024 14:32:39 -0700 Subject: [PATCH] Improve timeout and exit handling of child processes. fix #3 and #5 llama-swap only waited a maximum of 5 seconds for an upstream HTTP server to be available. If it took longer than that it will error out the request. Now it will wait up to the configured healthCheckTimeout or the upstream process unexpectedly exits. --- config.example.yaml | 2 +- proxy/manager.go | 42 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/config.example.yaml b/config.example.yaml index 4e51a76..315951c 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,6 +1,6 @@ # Seconds to wait for llama.cpp to be available to serve requests # Default (and minimum): 15 seconds -healthCheckTimeout: 60 +healthCheckTimeout: 15 models: "llama": diff --git a/proxy/manager.go b/proxy/manager.go index abafcae..db5d0e8 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -149,14 +149,28 @@ func (pm *ProxyManager) swapModel(requestedModel string) error { } pm.currentCmd = cmd - if err := pm.checkHealthEndpoint(); err != nil { + // watch for the command to exist + cmdCtx, cancel := context.WithCancelCause(context.Background()) + + // monitor the command's exist status + go func() { + err := cmd.Wait() + if err != nil { + cancel(fmt.Errorf("command [%s] %s", strings.Join(cmd.Args, " "), err.Error())) + } else { + cancel(nil) + } + }() + + // wait for checkHealthEndpoint + if err := pm.checkHealthEndpoint(cmdCtx); err != nil { return err } return nil } -func (pm *ProxyManager) checkHealthEndpoint() error { +func (pm *ProxyManager) checkHealthEndpoint(cmdCtx context.Context) error { if pm.currentConfig.Proxy == "" { return fmt.Errorf("no upstream available to check /health") @@ -179,6 +193,7 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if err != nil { return fmt.Errorf("failed to create health url with with %s and path %s", proxyTo, checkEndpoint) } + client := &http.Client{} startTime := time.Now() @@ -188,24 +203,34 @@ func (pm *ProxyManager) checkHealthEndpoint() error { return err } - ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond) + ctx, cancel := context.WithTimeout(cmdCtx, 250*time.Millisecond) defer cancel() req = req.WithContext(ctx) resp, err := client.Do(req) + ttl := (maxDuration - time.Since(startTime)).Seconds() + if err != nil { - if time.Since(startTime) >= maxDuration { - return fmt.Errorf("failed to check health from: %s", healthURL) + // check if the context was cancelled + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: } // wait a bit longer for TCP connection issues if strings.Contains(err.Error(), "connection refused") { - fmt.Fprintf(pm.logMonitor, "Connection refused on %s\n", healthURL) + fmt.Fprintf(pm.logMonitor, "Connection refused on %s, ttl %.0fs\n", healthURL, ttl) + time.Sleep(5 * time.Second) } else { time.Sleep(time.Second) } + if ttl < 0 { + return fmt.Errorf("failed to check health from: %s", healthURL) + } + continue } @@ -213,7 +238,8 @@ func (pm *ProxyManager) checkHealthEndpoint() error { if resp.StatusCode == http.StatusOK { return nil } - if time.Since(startTime) >= maxDuration { + + if ttl < 0 { return fmt.Errorf("failed to check health from: %s", healthURL) } @@ -239,7 +265,7 @@ func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request) } if err := pm.swapModel(model); err != nil { - http.Error(w, fmt.Sprintf("unable to swap to model: %s", err.Error()), http.StatusNotFound) + http.Error(w, fmt.Sprintf("unable to swap to model, %s", err.Error()), http.StatusNotFound) return }