diff --git a/Makefile b/Makefile index 265a46b..b3f6f7e 100644 --- a/Makefile +++ b/Makefile @@ -20,10 +20,10 @@ clean: rm -rf $(BUILD_DIR) test: - go test -short -v ./proxy + go test -short -v -count=1 ./proxy test-all: - go test -v ./proxy + go test -v -count=1 ./proxy # Build OSX binary mac: diff --git a/misc/simple-responder/simple-responder.go b/misc/simple-responder/simple-responder.go index 0512972..60b0ff4 100644 --- a/misc/simple-responder/simple-responder.go +++ b/misc/simple-responder/simple-responder.go @@ -26,6 +26,8 @@ func main() { silent := flag.Bool("silent", false, "disable all logging") + ignoreSigTerm := flag.Bool("ignore-sig-term", false, "ignore SIGTERM signal") + flag.Parse() // Parse the command-line flags // Create a new Gin router @@ -190,6 +192,10 @@ func main() { log.SetOutput(io.Discard) } + if !*silent { + fmt.Printf("My PID: %d\n", os.Getpid()) + } + go func() { log.Printf("simple-responder listening on %s\n", address) // service connections @@ -200,11 +206,36 @@ func main() { // Wait for interrupt signal to gracefully shutdown the server with // a timeout of 5 seconds. - quit := make(chan os.Signal, 1) + sigChan := make(chan os.Signal, 1) // kill (no param) default send syscall.SIGTERM // kill -2 is syscall.SIGINT // kill -9 is syscall.SIGKILL but can't be catch, so don't need add it - signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) - <-quit + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + countSigInt := 0 + +runloop: + for { + signal := <-sigChan + switch signal { + case syscall.SIGINT: + countSigInt++ + if countSigInt > 1 { + break runloop + } else { + log.Println("Recieved SIGINT, send another SIGINT to shutdown") + } + case syscall.SIGTERM: + if *ignoreSigTerm { + log.Println("Ignoring SIGTERM") + } else { + log.Println("Recieved SIGTERM, shutting down") + break runloop + } + default: + break runloop + } + } + log.Println("simple-responder shutting down") } diff --git a/proxy/helpers_test.go b/proxy/helpers_test.go index 0082192..6fc9e07 100644 --- a/proxy/helpers_test.go +++ b/proxy/helpers_test.go @@ -48,14 +48,18 @@ func getSimpleResponderPath() string { return filepath.Join("..", "build", fmt.Sprintf("simple-responder_%s_%s", goos, goarch)) } -func getTestSimpleResponderConfig(expectedMessage string) ModelConfig { +func getTestPort() int { portMutex.Lock() defer portMutex.Unlock() port := nextTestPort nextTestPort++ - return getTestSimpleResponderConfigPort(expectedMessage, port) + return port +} + +func getTestSimpleResponderConfig(expectedMessage string) ModelConfig { + return getTestSimpleResponderConfigPort(expectedMessage, getTestPort()) } func getTestSimpleResponderConfigPort(expectedMessage string, port int) ModelConfig { diff --git a/proxy/process.go b/proxy/process.go index 7db2d9e..4d9c7ae 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -67,6 +67,9 @@ type Process struct { // for managing concurrency limits concurrencyLimitSemaphore chan struct{} + + // stop timeout waiting for graceful shutdown + gracefulStopTimeout time.Duration } func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process { @@ -92,6 +95,9 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo // concurrency limit concurrencyLimitSemaphore: make(chan struct{}, concurrentLimit), + + // stop timeout + gracefulStopTimeout: 5 * time.Second, } } @@ -348,7 +354,7 @@ func (p *Process) StopImmediately() { } // stop the process with a graceful exit timeout - p.stopCommand(5 * time.Second) + p.stopCommand(p.gracefulStopTimeout) if curState, err := p.swapState(StateStopping, StateStopped); err != nil { p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState) @@ -361,7 +367,7 @@ func (p *Process) StopImmediately() { // the StateShutdown state, it can not be started again. func (p *Process) Shutdown() { p.shutdownCancel() - p.stopCommand(5 * time.Second) + p.stopCommand(p.gracefulStopTimeout) p.state = StateShutdown } diff --git a/proxy/process_test.go b/proxy/process_test.go index a715215..c73089d 100644 --- a/proxy/process_test.go +++ b/proxy/process_test.go @@ -393,3 +393,53 @@ func TestProcess_StopImmediately(t *testing.T) { process.StopImmediately() assert.Equal(t, process.CurrentState(), StateStopped) } + +// Test that SIGKILL is sent when gracefulStopTimeout is reached and properly terminates +// the upstream command +func TestProcess_ForceStopWithKill(t *testing.T) { + + expectedMessage := "test_sigkill" + binaryPath := getSimpleResponderPath() + port := getTestPort() + + config := ModelConfig{ + // note --ignore-sig-term which ignores the SIGTERM signal so a SIGKILL must be sent + // to force the process to exit + Cmd: fmt.Sprintf("%s --port %d --respond %s --silent --ignore-sig-term", binaryPath, port, expectedMessage), + Proxy: fmt.Sprintf("http://127.0.0.1:%d", port), + CheckEndpoint: "/health", + } + + process := NewProcess("stop_immediate", 2, config, debugLogger, debugLogger) + defer process.Stop() + + // reduce to make testing go faster + process.gracefulStopTimeout = time.Second + + err := process.start() + assert.Nil(t, err) + assert.Equal(t, process.CurrentState(), StateReady) + + waitChan := make(chan struct{}) + go func() { + // slow, but will get killed by StopImmediate + req := httptest.NewRequest("GET", "/slow-respond?echo=12345&delay=2s", nil) + w := httptest.NewRecorder() + process.ProxyRequest(w, req) + + // StatusOK because that was already sent before the kill + assert.Equal(t, http.StatusOK, w.Code) + + // unexpected EOF because the kill happened, the "1" is sent before the kill + // then the unexpected EOF is sent after the kill + assert.Equal(t, "1unexpected EOF\n", w.Body.String()) + close(waitChan) + }() + + <-time.After(time.Millisecond) + process.StopImmediately() + assert.Equal(t, process.CurrentState(), StateStopped) + + // the request should have been interrupted by SIGKILL + <-waitChan +} diff --git a/proxy/proxymanager_test.go b/proxy/proxymanager_test.go index b0379a7..0c12b7b 100644 --- a/proxy/proxymanager_test.go +++ b/proxy/proxymanager_test.go @@ -339,7 +339,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) { "model1": getTestSimpleResponderConfig("model1"), "model2": getTestSimpleResponderConfig("model2"), }, - LogLevel: "debug", + LogLevel: "warn", }) // Define a helper struct to parse the JSON response.