Improve testing around using SIGKILL (#127)

* Add test for SIGKILL of process
* silent TestProxyManager_RunningEndpoint debug output
* Ref #125
This commit is contained in:
Benson Wong
2025-05-13 21:21:52 -07:00
committed by GitHub
parent 519c3a4d22
commit 7f37bcc6eb
6 changed files with 101 additions and 10 deletions

View File

@@ -20,10 +20,10 @@ clean:
rm -rf $(BUILD_DIR) rm -rf $(BUILD_DIR)
test: test:
go test -short -v ./proxy go test -short -v -count=1 ./proxy
test-all: test-all:
go test -v ./proxy go test -v -count=1 ./proxy
# Build OSX binary # Build OSX binary
mac: mac:

View File

@@ -26,6 +26,8 @@ func main() {
silent := flag.Bool("silent", false, "disable all logging") silent := flag.Bool("silent", false, "disable all logging")
ignoreSigTerm := flag.Bool("ignore-sig-term", false, "ignore SIGTERM signal")
flag.Parse() // Parse the command-line flags flag.Parse() // Parse the command-line flags
// Create a new Gin router // Create a new Gin router
@@ -190,6 +192,10 @@ func main() {
log.SetOutput(io.Discard) log.SetOutput(io.Discard)
} }
if !*silent {
fmt.Printf("My PID: %d\n", os.Getpid())
}
go func() { go func() {
log.Printf("simple-responder listening on %s\n", address) log.Printf("simple-responder listening on %s\n", address)
// service connections // service connections
@@ -200,11 +206,36 @@ func main() {
// Wait for interrupt signal to gracefully shutdown the server with // Wait for interrupt signal to gracefully shutdown the server with
// a timeout of 5 seconds. // a timeout of 5 seconds.
quit := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
// kill (no param) default send syscall.SIGTERM // kill (no param) default send syscall.SIGTERM
// kill -2 is syscall.SIGINT // kill -2 is syscall.SIGINT
// kill -9 is syscall.SIGKILL but can't be catch, so don't need add it // kill -9 is syscall.SIGKILL but can't be catch, so don't need add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-quit
countSigInt := 0
runloop:
for {
signal := <-sigChan
switch signal {
case syscall.SIGINT:
countSigInt++
if countSigInt > 1 {
break runloop
} else {
log.Println("Recieved SIGINT, send another SIGINT to shutdown")
}
case syscall.SIGTERM:
if *ignoreSigTerm {
log.Println("Ignoring SIGTERM")
} else {
log.Println("Recieved SIGTERM, shutting down")
break runloop
}
default:
break runloop
}
}
log.Println("simple-responder shutting down") log.Println("simple-responder shutting down")
} }

View File

@@ -48,14 +48,18 @@ func getSimpleResponderPath() string {
return filepath.Join("..", "build", fmt.Sprintf("simple-responder_%s_%s", goos, goarch)) return filepath.Join("..", "build", fmt.Sprintf("simple-responder_%s_%s", goos, goarch))
} }
func getTestSimpleResponderConfig(expectedMessage string) ModelConfig { func getTestPort() int {
portMutex.Lock() portMutex.Lock()
defer portMutex.Unlock() defer portMutex.Unlock()
port := nextTestPort port := nextTestPort
nextTestPort++ nextTestPort++
return getTestSimpleResponderConfigPort(expectedMessage, port) return port
}
func getTestSimpleResponderConfig(expectedMessage string) ModelConfig {
return getTestSimpleResponderConfigPort(expectedMessage, getTestPort())
} }
func getTestSimpleResponderConfigPort(expectedMessage string, port int) ModelConfig { func getTestSimpleResponderConfigPort(expectedMessage string, port int) ModelConfig {

View File

@@ -67,6 +67,9 @@ type Process struct {
// for managing concurrency limits // for managing concurrency limits
concurrencyLimitSemaphore chan struct{} concurrencyLimitSemaphore chan struct{}
// stop timeout waiting for graceful shutdown
gracefulStopTimeout time.Duration
} }
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process { func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
@@ -92,6 +95,9 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLo
// concurrency limit // concurrency limit
concurrencyLimitSemaphore: make(chan struct{}, concurrentLimit), concurrencyLimitSemaphore: make(chan struct{}, concurrentLimit),
// stop timeout
gracefulStopTimeout: 5 * time.Second,
} }
} }
@@ -348,7 +354,7 @@ func (p *Process) StopImmediately() {
} }
// stop the process with a graceful exit timeout // stop the process with a graceful exit timeout
p.stopCommand(5 * time.Second) p.stopCommand(p.gracefulStopTimeout)
if curState, err := p.swapState(StateStopping, StateStopped); err != nil { if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState) p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
@@ -361,7 +367,7 @@ func (p *Process) StopImmediately() {
// the StateShutdown state, it can not be started again. // the StateShutdown state, it can not be started again.
func (p *Process) Shutdown() { func (p *Process) Shutdown() {
p.shutdownCancel() p.shutdownCancel()
p.stopCommand(5 * time.Second) p.stopCommand(p.gracefulStopTimeout)
p.state = StateShutdown p.state = StateShutdown
} }

View File

@@ -393,3 +393,53 @@ func TestProcess_StopImmediately(t *testing.T) {
process.StopImmediately() process.StopImmediately()
assert.Equal(t, process.CurrentState(), StateStopped) assert.Equal(t, process.CurrentState(), StateStopped)
} }
// Test that SIGKILL is sent when gracefulStopTimeout is reached and properly terminates
// the upstream command
func TestProcess_ForceStopWithKill(t *testing.T) {
expectedMessage := "test_sigkill"
binaryPath := getSimpleResponderPath()
port := getTestPort()
config := ModelConfig{
// note --ignore-sig-term which ignores the SIGTERM signal so a SIGKILL must be sent
// to force the process to exit
Cmd: fmt.Sprintf("%s --port %d --respond %s --silent --ignore-sig-term", binaryPath, port, expectedMessage),
Proxy: fmt.Sprintf("http://127.0.0.1:%d", port),
CheckEndpoint: "/health",
}
process := NewProcess("stop_immediate", 2, config, debugLogger, debugLogger)
defer process.Stop()
// reduce to make testing go faster
process.gracefulStopTimeout = time.Second
err := process.start()
assert.Nil(t, err)
assert.Equal(t, process.CurrentState(), StateReady)
waitChan := make(chan struct{})
go func() {
// slow, but will get killed by StopImmediate
req := httptest.NewRequest("GET", "/slow-respond?echo=12345&delay=2s", nil)
w := httptest.NewRecorder()
process.ProxyRequest(w, req)
// StatusOK because that was already sent before the kill
assert.Equal(t, http.StatusOK, w.Code)
// unexpected EOF because the kill happened, the "1" is sent before the kill
// then the unexpected EOF is sent after the kill
assert.Equal(t, "1unexpected EOF\n", w.Body.String())
close(waitChan)
}()
<-time.After(time.Millisecond)
process.StopImmediately()
assert.Equal(t, process.CurrentState(), StateStopped)
// the request should have been interrupted by SIGKILL
<-waitChan
}

View File

@@ -339,7 +339,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
"model1": getTestSimpleResponderConfig("model1"), "model1": getTestSimpleResponderConfig("model1"),
"model2": getTestSimpleResponderConfig("model2"), "model2": getTestSimpleResponderConfig("model2"),
}, },
LogLevel: "debug", LogLevel: "warn",
}) })
// Define a helper struct to parse the JSON response. // Define a helper struct to parse the JSON response.