Add config hot-reload (#106)

introduce --watch-config command line option to reload ProxyManager when configuration changes.
This commit is contained in:
Sam
2025-05-12 10:37:00 +10:00
committed by GitHub
parent 9548931258
commit bc652709a5
7 changed files with 196 additions and 72 deletions

View File

@@ -185,7 +185,7 @@ func (p *Process) start() error {
return fmt.Errorf("start() failed: %v", err)
}
// Capture the exit error for later signaling
// Capture the exit error for later signalling
go func() {
exitErr := p.cmd.Wait()
p.proxyLogger.Debugf("<%s> cmd.Wait() returned error: %v", p.ID, exitErr)
@@ -260,9 +260,9 @@ func (p *Process) start() error {
if strings.Contains(err.Error(), "connection refused") {
endTime, _ := checkDeadline.Deadline()
ttl := time.Until(endTime)
p.proxyLogger.Infof("<%s> Connection refused on %s, giving up in %.0fs", p.ID, healthURL, ttl.Seconds())
p.proxyLogger.Debugf("<%s> Connection refused on %s, giving up in %.0fs (normal during startup)", p.ID, healthURL, ttl.Seconds())
} else {
p.proxyLogger.Infof("<%s> Health check error on %s, %v", p.ID, healthURL, err)
p.proxyLogger.Debugf("<%s> Health check error on %s, %v (normal during startup)", p.ID, healthURL, err)
}
}
}
@@ -345,31 +345,33 @@ func (p *Process) stopCommand(sigtermTTL time.Duration) {
defer cancelTimeout()
if p.cmd == nil || p.cmd.Process == nil {
p.proxyLogger.Warnf("<%s> cmd or cmd.Process is nil", p.ID)
p.proxyLogger.Debugf("<%s> cmd or cmd.Process is nil (normal during config reload)", p.ID)
return
}
if err := p.terminateProcess(); err != nil {
p.proxyLogger.Infof("<%s> Failed to gracefully terminate process: %v", p.ID, err)
p.proxyLogger.Debugf("<%s> Process already terminated: %v (normal during shutdown)", p.ID, err)
}
select {
case <-sigtermTimeout.Done():
p.proxyLogger.Infof("<%s> Process timed out waiting to stop, sending KILL signal", p.ID)
p.cmd.Process.Kill()
p.proxyLogger.Debugf("<%s> Process timed out waiting to stop, sending KILL signal (normal during shutdown)", p.ID)
if err := p.cmd.Process.Kill(); err != nil {
p.proxyLogger.Errorf("<%s> Failed to kill process: %v", p.ID, err)
}
case err := <-p.cmdWaitChan:
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
// because if we make it here then the cmd has been successfully running and made it
// through the health check. There is a possibility that ithe cmd crashed after the health check
// through the health check. There is a possibility that the cmd crashed after the health check
// succeeded but that's not a case llama-swap is handling for now.
if err != nil {
if errno, ok := err.(syscall.Errno); ok {
p.proxyLogger.Errorf("<%s> errno >> %v", p.ID, errno)
} else if exitError, ok := err.(*exec.ExitError); ok {
if strings.Contains(exitError.String(), "signal: terminated") {
p.proxyLogger.Infof("<%s> Process stopped OK", p.ID)
p.proxyLogger.Debugf("<%s> Process stopped OK", p.ID)
} else if strings.Contains(exitError.String(), "signal: interrupt") {
p.proxyLogger.Infof("<%s> Process interrupted OK", p.ID)
p.proxyLogger.Debugf("<%s> Process interrupted OK", p.ID)
} else {
p.proxyLogger.Warnf("<%s> ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
}

View File

@@ -82,6 +82,11 @@ func New(config Config) *ProxyManager {
pm.processGroups[groupID] = processGroup
}
pm.setupGinEngine()
return pm
}
func (pm *ProxyManager) setupGinEngine() {
pm.ginEngine.Use(func(c *gin.Context) {
// Start timer
start := time.Now()
@@ -192,18 +197,17 @@ func New(config Config) *ProxyManager {
// Disable console color for testing
gin.DisableConsoleColor()
return pm
}
func (pm *ProxyManager) Run(addr ...string) error {
return pm.ginEngine.Run(addr...)
}
func (pm *ProxyManager) HandlerFunc(w http.ResponseWriter, r *http.Request) {
// ServeHTTP implements http.Handler interface
func (pm *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pm.ginEngine.ServeHTTP(w, r)
}
// StopProcesses acquires a lock and stops all running upstream processes.
// This is the public method safe for concurrent calls.
// Unlike Shutdown, this method only stops the processes but doesn't perform
// a complete shutdown, allowing for process replacement without full termination.
func (pm *ProxyManager) StopProcesses() {
pm.Lock()
defer pm.Unlock()
@@ -221,8 +225,7 @@ func (pm *ProxyManager) StopProcesses() {
wg.Wait()
}
// Shutdown is called to shutdown all upstream processes
// when llama-swap is shutting down.
// Shutdown stops all processes managed by this ProxyManager
func (pm *ProxyManager) Shutdown() {
pm.Lock()
defer pm.Unlock()

View File

@@ -34,7 +34,7 @@ func TestProxyManager_SwapProcessCorrectly(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), modelName)
}
@@ -72,10 +72,9 @@ func TestProxyManager_SwapMultiProcess(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), requestedModel)
})
}
@@ -115,7 +114,7 @@ func TestProxyManager_PersistentGroupsAreNotSwapped(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), requestedModel)
}
@@ -158,14 +157,13 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions?wait=1000ms", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("Expected status OK, got %d for key %s", w.Code, key)
}
mu.Lock()
var response map[string]string
assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response))
results[key] = response["responseMessage"]
@@ -202,7 +200,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
w := httptest.NewRecorder()
// Call the listModelsHandler
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
// Check the response status code
assert.Equal(t, http.StatusOK, w.Code)
@@ -292,7 +290,7 @@ func TestProxyManager_Shutdown(t *testing.T) {
w := httptest.NewRecorder()
// send a request to trigger the proxy to load ... this should hang waiting for start up
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadGateway, w.Code)
assert.Contains(t, w.Body.String(), "health check interrupted due to shutdown")
}(modelName)
@@ -318,12 +316,12 @@ func TestProxyManager_Unload(t *testing.T) {
reqBody := fmt.Sprintf(`{"model":"%s"}`, "model1")
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, proxy.processGroups[DEFAULT_GROUP_ID].processes["model1"].CurrentState(), StateReady)
req = httptest.NewRequest("GET", "/unload", nil)
w = httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Equal(t, w.Body.String(), "OK")
@@ -334,7 +332,6 @@ func TestProxyManager_Unload(t *testing.T) {
// Test issue #61 `Listing the current list of models and the loaded model.`
func TestProxyManager_RunningEndpoint(t *testing.T) {
// Shared configuration
config := AddDefaultGroupToConfig(Config{
HealthCheckTimeout: 15,
@@ -360,7 +357,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
t.Run("no models loaded", func(t *testing.T) {
req := httptest.NewRequest("GET", "/running", nil)
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
@@ -378,13 +375,13 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
reqBody := `{"model":"model1"}`
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
// Simulate browser call for the `/running` endpoint.
req = httptest.NewRequest("GET", "/running", nil)
w = httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
var response RunningResponse
assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response))
@@ -436,7 +433,7 @@ func TestProxyManager_AudioTranscriptionHandler(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/audio/transcriptions", &b)
req.Header.Set("Content-Type", w.FormDataContentType())
rec := httptest.NewRecorder()
proxy.HandlerFunc(rec, req)
proxy.ServeHTTP(rec, req)
// Verify the response
assert.Equal(t, http.StatusOK, rec.Code)
@@ -473,7 +470,7 @@ func TestProxyManager_UseModelName(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
assert.Contains(t, w.Body.String(), upstreamModelName)
})
@@ -500,7 +497,7 @@ func TestProxyManager_UseModelName(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/audio/transcriptions", &b)
req.Header.Set("Content-Type", w.FormDataContentType())
rec := httptest.NewRecorder()
proxy.HandlerFunc(rec, req)
proxy.ServeHTTP(rec, req)
// Verify the response
assert.Equal(t, http.StatusOK, rec.Code)
@@ -568,7 +565,7 @@ func TestProxyManager_CORSOptionsHandler(t *testing.T) {
}
w := httptest.NewRecorder()
proxy.ginEngine.ServeHTTP(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, tt.expectedStatus, w.Code)
@@ -592,7 +589,7 @@ func TestProxyManager_Upstream(t *testing.T) {
defer proxy.StopProcesses()
req := httptest.NewRequest("GET", "/upstream/model1/test", nil)
rec := httptest.NewRecorder()
proxy.HandlerFunc(rec, req)
proxy.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "model1", rec.Body.String())
}
@@ -613,7 +610,7 @@ func TestProxyManager_ChatContentLength(t *testing.T) {
req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(reqBody))
w := httptest.NewRecorder()
proxy.HandlerFunc(w, req)
proxy.ServeHTTP(w, req)
assert.Equal(t, http.StatusOK, w.Code)
var response map[string]string
assert.NoError(t, json.Unmarshal(w.Body.Bytes(), &response))