tag all process logs with its ID (#103)

Makes identifying Process of log messages easier
This commit is contained in:
Benson Wong
2025-04-25 12:58:25 -07:00
committed by GitHub
parent 5fad24c16f
commit 06eda7f591
2 changed files with 26 additions and 27 deletions

View File

@@ -93,17 +93,17 @@ func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState,
defer p.stateMutex.Unlock() defer p.stateMutex.Unlock()
if p.state != expectedState { if p.state != expectedState {
p.proxyLogger.Warnf("swapState() Unexpected current state %s, expected %s", p.state, expectedState) p.proxyLogger.Warnf("<%s> swapState() Unexpected current state %s, expected %s", p.ID, p.state, expectedState)
return p.state, ErrExpectedStateMismatch return p.state, ErrExpectedStateMismatch
} }
if !isValidTransition(p.state, newState) { if !isValidTransition(p.state, newState) {
p.proxyLogger.Warnf("swapState() Invalid state transition from %s to %s", p.state, newState) p.proxyLogger.Warnf("<%s> swapState() Invalid state transition from %s to %s", p.ID, p.state, newState)
return p.state, ErrInvalidStateTransition return p.state, ErrInvalidStateTransition
} }
p.state = newState p.state = newState
p.proxyLogger.Debugf("swapState() State transitioned from %s to %s", expectedState, newState) p.proxyLogger.Debugf("<%s> swapState() State transitioned from %s to %s", p.ID, expectedState, newState)
return p.state, nil return p.state, nil
} }
@@ -187,7 +187,7 @@ func (p *Process) start() error {
// Capture the exit error for later signaling // Capture the exit error for later signaling
go func() { go func() {
exitErr := p.cmd.Wait() exitErr := p.cmd.Wait()
p.proxyLogger.Debugf("cmd.Wait() returned for [%s] error: %v", p.ID, exitErr) p.proxyLogger.Debugf("<%s> cmd.Wait() returned error: %v", p.ID, exitErr)
p.cmdWaitChan <- exitErr p.cmdWaitChan <- exitErr
}() }()
@@ -236,32 +236,32 @@ func (p *Process) start() error {
return errors.New("health check interrupted due to shutdown") return errors.New("health check interrupted due to shutdown")
case exitErr := <-p.cmdWaitChan: case exitErr := <-p.cmdWaitChan:
if exitErr != nil { if exitErr != nil {
p.proxyLogger.Warnf("upstream command exited prematurely with error: %v", exitErr) p.proxyLogger.Warnf("<%s> upstream command exited prematurely with error: %v", p.ID, exitErr)
if curState, err := p.swapState(StateStarting, StateFailed); err != nil { if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
return fmt.Errorf("upstream command exited unexpectedly: %s AND state swap failed: %v, current state: %v", exitErr.Error(), err, curState) return fmt.Errorf("upstream command exited unexpectedly: %s AND state swap failed: %v, current state: %v", exitErr.Error(), err, curState)
} else { } else {
return fmt.Errorf("upstream command exited unexpectedly: %s", exitErr.Error()) return fmt.Errorf("upstream command exited unexpectedly: %s", exitErr.Error())
} }
} else { } else {
p.proxyLogger.Warnf("upstream command exited prematurely with no error") p.proxyLogger.Warnf("<%s> upstream command exited prematurely but successfully", p.ID)
if curState, err := p.swapState(StateStarting, StateFailed); err != nil { if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
return fmt.Errorf("upstream command exited prematurely with no error AND state swap failed: %v, current state: %v", err, curState) return fmt.Errorf("upstream command exited prematurely but successfully AND state swap failed: %v, current state: %v", err, curState)
} else { } else {
return fmt.Errorf("upstream command exited prematurely with no error") return fmt.Errorf("upstream command exited prematurely but successfully")
} }
} }
default: default:
if err := p.checkHealthEndpoint(healthURL); err == nil { if err := p.checkHealthEndpoint(healthURL); err == nil {
p.proxyLogger.Infof("Health check passed on %s", healthURL) p.proxyLogger.Infof("<%s> Health check passed on %s", p.ID, healthURL)
cancelHealthCheck() cancelHealthCheck()
break loop break loop
} else { } else {
if strings.Contains(err.Error(), "connection refused") { if strings.Contains(err.Error(), "connection refused") {
endTime, _ := checkDeadline.Deadline() endTime, _ := checkDeadline.Deadline()
ttl := time.Until(endTime) ttl := time.Until(endTime)
p.proxyLogger.Infof("Connection refused on %s, giving up in %.0fs", healthURL, ttl.Seconds()) p.proxyLogger.Infof("<%s> Connection refused on %s, giving up in %.0fs", p.ID, healthURL, ttl.Seconds())
} else { } else {
p.proxyLogger.Infof("Health check error on %s, %v", healthURL, err) p.proxyLogger.Infof("<%s> Health check error on %s, %v", p.ID, healthURL, err)
} }
} }
} }
@@ -285,7 +285,7 @@ func (p *Process) start() error {
p.inFlightRequests.Wait() p.inFlightRequests.Wait()
if time.Since(p.lastRequestHandled) > maxDuration { if time.Since(p.lastRequestHandled) > maxDuration {
p.proxyLogger.Infof("Unloading model %s, TTL of %ds reached.", p.ID, p.config.UnloadAfter) p.proxyLogger.Infof("<%s> Unloading model, TTL of %ds reached", p.ID, p.config.UnloadAfter)
p.Stop() p.Stop()
return return
} }
@@ -303,11 +303,11 @@ func (p *Process) start() error {
func (p *Process) Stop() { func (p *Process) Stop() {
// wait for any inflight requests before proceeding // wait for any inflight requests before proceeding
p.inFlightRequests.Wait() p.inFlightRequests.Wait()
p.proxyLogger.Debugf("Stopping process [%s]", p.ID) p.proxyLogger.Debugf("<%s> Stopping process", p.ID)
// calling Stop() when state is invalid is a no-op // calling Stop() when state is invalid is a no-op
if curState, err := p.swapState(StateReady, StateStopping); err != nil { if curState, err := p.swapState(StateReady, StateStopping); err != nil {
p.proxyLogger.Infof("Stop() Ready -> StateStopping err: %v, current state: %v", err, curState) p.proxyLogger.Infof("<%s> Stop() Ready -> StateStopping err: %v, current state: %v", p.ID, err, curState)
return return
} }
@@ -315,7 +315,7 @@ func (p *Process) Stop() {
p.stopCommand(5 * time.Second) p.stopCommand(5 * time.Second)
if curState, err := p.swapState(StateStopping, StateStopped); err != nil { if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
p.proxyLogger.Infof("Stop() StateStopping -> StateStopped err: %v, current state: %v", err, curState) p.proxyLogger.Infof("<%s> Stop() StateStopping -> StateStopped err: %v, current state: %v", p.ID, err, curState)
} }
} }
@@ -333,24 +333,24 @@ func (p *Process) Shutdown() {
func (p *Process) stopCommand(sigtermTTL time.Duration) { func (p *Process) stopCommand(sigtermTTL time.Duration) {
stopStartTime := time.Now() stopStartTime := time.Now()
defer func() { defer func() {
p.proxyLogger.Debugf("Process [%s] stopCommand took %v", p.ID, time.Since(stopStartTime)) p.proxyLogger.Debugf("<%s> stopCommand took %v", p.ID, time.Since(stopStartTime))
}() }()
sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL) sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL)
defer cancelTimeout() defer cancelTimeout()
if p.cmd == nil || p.cmd.Process == nil { if p.cmd == nil || p.cmd.Process == nil {
p.proxyLogger.Warnf("Process [%s] cmd or cmd.Process is nil", p.ID) p.proxyLogger.Warnf("<%s> cmd or cmd.Process is nil", p.ID)
return return
} }
if err := p.terminateProcess(); err != nil { if err := p.terminateProcess(); err != nil {
p.proxyLogger.Infof("Failed to gracefully terminate process [%s]: %v", p.ID, err) p.proxyLogger.Infof("<%s> Failed to gracefully terminate process: %v", p.ID, err)
} }
select { select {
case <-sigtermTimeout.Done(): case <-sigtermTimeout.Done():
p.proxyLogger.Infof("Process [%s] timed out waiting to stop, sending KILL signal", p.ID) p.proxyLogger.Infof("<%s> Process timed out waiting to stop, sending KILL signal", p.ID)
p.cmd.Process.Kill() p.cmd.Process.Kill()
case err := <-p.cmdWaitChan: case err := <-p.cmdWaitChan:
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK // Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
@@ -359,24 +359,23 @@ func (p *Process) stopCommand(sigtermTTL time.Duration) {
// succeeded but that's not a case llama-swap is handling for now. // succeeded but that's not a case llama-swap is handling for now.
if err != nil { if err != nil {
if errno, ok := err.(syscall.Errno); ok { if errno, ok := err.(syscall.Errno); ok {
p.proxyLogger.Errorf("Process [%s] errno >> %v", p.ID, errno) p.proxyLogger.Errorf("<%s> errno >> %v", p.ID, errno)
} else if exitError, ok := err.(*exec.ExitError); ok { } else if exitError, ok := err.(*exec.ExitError); ok {
if strings.Contains(exitError.String(), "signal: terminated") { if strings.Contains(exitError.String(), "signal: terminated") {
p.proxyLogger.Infof("Process [%s] stopped OK", p.ID) p.proxyLogger.Infof("<%s> Process stopped OK", p.ID)
} else if strings.Contains(exitError.String(), "signal: interrupt") { } else if strings.Contains(exitError.String(), "signal: interrupt") {
p.proxyLogger.Infof("Process [%s] interrupted OK", p.ID) p.proxyLogger.Infof("<%s> Process interrupted OK", p.ID)
} else { } else {
p.proxyLogger.Warnf("Process [%s] ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode()) p.proxyLogger.Warnf("<%s> ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
} }
} else { } else {
p.proxyLogger.Errorf("Process [%s] exited >> %v", p.ID, err) p.proxyLogger.Errorf("<%s> Process exited >> %v", p.ID, err)
} }
} }
} }
} }
func (p *Process) checkHealthEndpoint(healthURL string) error { func (p *Process) checkHealthEndpoint(healthURL string) error {
client := &http.Client{ client := &http.Client{
Timeout: 500 * time.Millisecond, Timeout: 500 * time.Millisecond,
} }
@@ -471,6 +470,6 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
} }
totalTime := time.Since(requestBeginTime) totalTime := time.Since(requestBeginTime)
p.proxyLogger.Debugf("Process [%s] request %s - start: %v, total: %v", p.proxyLogger.Debugf("<%s> request %s - start: %v, total: %v",
p.ID, r.RequestURI, startDuration, totalTime) p.ID, r.RequestURI, startDuration, totalTime)
} }

View File

@@ -337,6 +337,6 @@ func TestProcess_ExitInterruptsHealthCheck(t *testing.T) {
process := NewProcess("sleepy", checkHealthTimeout, config, debugLogger, debugLogger) process := NewProcess("sleepy", checkHealthTimeout, config, debugLogger, debugLogger)
process.healthCheckLoopInterval = time.Second // make it faster process.healthCheckLoopInterval = time.Second // make it faster
err := process.start() err := process.start()
assert.Equal(t, "upstream command exited prematurely with no error", err.Error()) assert.Equal(t, "upstream command exited prematurely but successfully", err.Error())
assert.Equal(t, process.CurrentState(), StateFailed) assert.Equal(t, process.CurrentState(), StateFailed)
} }