From 83415430ba6b8f1d5a1ce11bc7f0300dd1446574 Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Thu, 3 Oct 2024 21:35:33 -0700 Subject: [PATCH] move proxy logic into the proxy package --- llama-proxy.go | 189 +------------------------------------------------ proxy/proxy.go | 179 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 186 deletions(-) create mode 100644 proxy/proxy.go diff --git a/llama-proxy.go b/llama-proxy.go index 445cf99..a073e28 100644 --- a/llama-proxy.go +++ b/llama-proxy.go @@ -1,191 +1,15 @@ package main import ( - "bytes" - "context" - "encoding/json" "flag" "fmt" - "io" "net/http" "os" - "os/exec" - "strings" - "sync" - "syscall" - "time" "github.com/mostlygeek/go-llama-cpp-proxy/config" + "github.com/mostlygeek/go-llama-cpp-proxy/proxy" ) -type ServiceState struct { - sync.Mutex - currentCmd *exec.Cmd - currentModel string -} - -func startService(command string) (*exec.Cmd, error) { - args := strings.Fields(command) - cmd := exec.Command(args[0], args[1:]...) - - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - err := cmd.Start() - if err != nil { - return nil, err - } - - return cmd, nil -} - -func checkHealthEndpoint(healthURL string, maxDuration time.Duration) error { - client := &http.Client{} - - startTime := time.Now() - for { - req, err := http.NewRequest("GET", healthURL, nil) - if err != nil { - return err - } - - // Set request timeout - ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond) - defer cancel() - - // Execute the request with the context - req = req.WithContext(ctx) - resp, err := client.Do(req) - if err != nil { - // Log error and check elapsed time before retrying - if time.Since(startTime) >= maxDuration { - return fmt.Errorf("failed to get a healthy response from: %s", healthURL) - } - - // Wait a second before retrying - time.Sleep(time.Second) - continue - } - - // Close response body - defer resp.Body.Close() - - // Check if we got a 200 OK response - if resp.StatusCode == http.StatusOK { - return nil // Health check succeeded - } - - // Check elapsed time before retrying - if time.Since(startTime) >= maxDuration { - return fmt.Errorf("failed to get a healthy response from: %s", healthURL) - } - - // Wait a second before retrying - time.Sleep(time.Second) - } -} - -func proxyChatRequest(w http.ResponseWriter, r *http.Request, config *config.Config, state *ServiceState) { - // Read the original request body - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "Invalid JSON", http.StatusBadRequest) - return - } - - var requestBody map[string]interface{} - if err := json.Unmarshal(bodyBytes, &requestBody); err != nil { - http.Error(w, "Invalid JSON", http.StatusBadRequest) - return - } - - model, ok := requestBody["model"].(string) - if !ok { - http.Error(w, "Missing or invalid 'model' key", http.StatusBadRequest) - return - } - - modelConfig, ok := config.Models[model] - if !ok { - http.Error(w, "Model not found in configuration", http.StatusNotFound) - return - } - - err = error(nil) - state.Lock() - defer state.Unlock() - - if state.currentModel != model { - if state.currentCmd != nil { - state.currentCmd.Process.Signal(syscall.SIGTERM) - } - state.currentCmd, err = startService(modelConfig.Cmd) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - state.currentModel = model - - // Check the /health endpoint - healthURL := modelConfig.Proxy + "/health" - err = checkHealthEndpoint(healthURL, 30*time.Second) - if err != nil { - http.Error(w, err.Error(), http.StatusServiceUnavailable) - return - } - } - - // replace r.Body so it can be read again - r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) - proxyRequest(modelConfig.Proxy, w, r) -} - -func proxyRequest(proxyHost string, w http.ResponseWriter, r *http.Request) { - client := &http.Client{} - req, err := http.NewRequest(r.Method, proxyHost+r.URL.String(), r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - req.Header = r.Header - resp, err := client.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - defer resp.Body.Close() - - for k, vv := range resp.Header { - for _, v := range vv { - w.Header().Add(k, v) - } - } - w.WriteHeader(resp.StatusCode) - - buf := make([]byte, 32*1024) // Buffer size set to 32KB - for { - n, err := resp.Body.Read(buf) - if n > 0 { - if _, writeErr := w.Write(buf[:n]); writeErr != nil { - http.Error(w, writeErr.Error(), http.StatusInternalServerError) - return - } - // Flush the buffer to the client - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } - } - if err == io.EOF { - break - } - if err != nil { - http.Error(w, err.Error(), http.StatusBadGateway) - return - } - } -} - func main() { // Define a command-line flag for the port configPath := flag.String("config", "config.yaml", "config file name") @@ -199,15 +23,8 @@ func main() { os.Exit(1) } - serviceState := &ServiceState{} - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/v1/chat/completions" { - proxyChatRequest(w, r, config, serviceState) - } else { - http.Error(w, "Endpoint not supported", http.StatusNotFound) - } - }) + proxyManager := proxy.New(config) + http.HandleFunc("/", proxyManager.HandleFunc) fmt.Println("Proxy server started on :8080") if err := http.ListenAndServe(*listenStr, nil); err != nil { diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..84738a4 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,179 @@ +package proxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "strings" + "sync" + "syscall" + "time" + + "github.com/mostlygeek/go-llama-cpp-proxy/config" +) + +type ProxyManager struct { + sync.Mutex + + config *config.Config + currentCmd *exec.Cmd + currentModel string + currentProxy string +} + +func New(config *config.Config) *ProxyManager { + return &ProxyManager{config: config} +} + +func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/v1/chat/completions" { + pm.proxyChatRequest(w, r) + } else { + http.Error(w, "Endpoint not supported", http.StatusNotFound) + } +} + +func (pm *ProxyManager) swapModel(model string) error { + pm.Lock() + defer pm.Unlock() + + if model == pm.currentModel { + return nil + } + + modelConfig, ok := pm.config.Models[model] + if !ok { + return fmt.Errorf("unknown model %s", model) + } + + if pm.currentCmd != nil { + pm.currentCmd.Process.Signal(syscall.SIGTERM) + } + + pm.currentModel = model + pm.currentProxy = modelConfig.Proxy + + args := strings.Fields(modelConfig.Cmd) + cmd := exec.Command(args[0], args[1:]...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Start() + if err != nil { + return err + } + pm.currentCmd = cmd + + if err := pm.checkHealthEndpoint(60 * time.Second); err != nil { + return err + } + + return nil +} + +func (pm *ProxyManager) checkHealthEndpoint(maxDuration time.Duration) error { + + if pm.currentProxy == "" { + return fmt.Errorf("no upstream available to check /health") + } + + healthURL := pm.currentProxy + "/health" + client := &http.Client{} + startTime := time.Now() + + for { + req, err := http.NewRequest("GET", healthURL, nil) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(req.Context(), 250*time.Millisecond) + defer cancel() + req = req.WithContext(ctx) + resp, err := client.Do(req) + if err != nil { + if time.Since(startTime) >= maxDuration { + return fmt.Errorf("failed to check /healthy from: %s", healthURL) + } + time.Sleep(time.Second) + continue + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusOK { + return nil + } + if time.Since(startTime) >= maxDuration { + return fmt.Errorf("failed to check /healthy from: %s", healthURL) + } + time.Sleep(time.Second) + } +} + +func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request) { + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + var requestBody map[string]interface{} + if err := json.Unmarshal(bodyBytes, &requestBody); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + model, ok := requestBody["model"].(string) + if !ok { + http.Error(w, "Missing or invalid 'model' key", http.StatusBadRequest) + return + } + + pm.swapModel(model) + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + pm.proxyRequest(w, r) +} + +func (pm *ProxyManager) proxyRequest(w http.ResponseWriter, r *http.Request) { + client := &http.Client{} + req, err := http.NewRequest(r.Method, pm.currentProxy+r.URL.String(), r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + req.Header = r.Header + resp, err := client.Do(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + defer resp.Body.Close() + for k, vv := range resp.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + + // faster than io.Copy when streaming + buf := make([]byte, 32*1024) + for { + n, err := resp.Body.Read(buf) + if n > 0 { + if _, writeErr := w.Write(buf[:n]); writeErr != nil { + http.Error(w, writeErr.Error(), http.StatusInternalServerError) + return + } + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + } + if err == io.EOF { + break + } + if err != nil { + http.Error(w, err.Error(), http.StatusBadGateway) + return + } + } +}