diff --git a/README.md b/README.md index f537c69..fedc6f4 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,6 @@ ![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/mostlygeek/llama-swap/go-ci.yml) ![GitHub Repo stars](https://img.shields.io/github/stars/mostlygeek/llama-swap) - - # llama-swap llama-swap is a light weight, transparent proxy server that provides automatic model swapping to llama.cpp's server. @@ -69,8 +67,8 @@ models: # Default (and minimum) is 15 seconds healthCheckTimeout: 60 -# Write HTTP logs (useful for troubleshooting), defaults to false -logRequests: true +# Valid log levels: debug, info (default), warn, error +logLevel: info # define valid model values and the upstream server start models: @@ -221,9 +219,15 @@ Of course, CLI access is also supported: # sends up to the last 10KB of logs curl http://host/logs' -# streams logs +# streams combined logs curl -Ns 'http://host/logs/stream' +# just llama-swap's logs +curl -Ns 'http://host/logs/stream/proxy' + +# just upstream's logs +curl -Ns 'http://host/logs/stream/upstream' + # stream and filter logs with linux pipes curl -Ns http://host/logs/stream | grep 'eval time' diff --git a/config.example.yaml b/config.example.yaml index 85ec5be..8aa650a 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1,9 +1,9 @@ # Seconds to wait for llama.cpp to be available to serve requests # Default (and minimum): 15 seconds -healthCheckTimeout: 15 +healthCheckTimeout: 90 -# Log HTTP requests helpful for troubleshoot, defaults to False -logRequests: true +# valid log levels: debug, info (default), warn, error +logLevel: info models: "llama": diff --git a/proxy/config.go b/proxy/config.go index 56fedf6..bef11d9 100644 --- a/proxy/config.go +++ b/proxy/config.go @@ -27,6 +27,7 @@ func (m *ModelConfig) SanitizedCommand() ([]string, error) { type Config struct { HealthCheckTimeout int `yaml:"healthCheckTimeout"` LogRequests bool `yaml:"logRequests"` + LogLevel string `yaml:"logLevel"` Models map[string]ModelConfig `yaml:"models"` Profiles map[string][]string `yaml:"profiles"` diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index 17f1a9f..cbf96df 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -2,11 +2,21 @@ package proxy import ( "container/ring" + "fmt" "io" "os" "sync" ) +type LogLevel int + +const ( + LevelDebug LogLevel = iota + LevelInfo + LevelWarn + LevelError +) + type LogMonitor struct { clients map[chan []byte]bool mu sync.RWMutex @@ -15,6 +25,10 @@ type LogMonitor struct { // typically this can be os.Stdout stdout io.Writer + + // logging levels + level LogLevel + prefix string } func NewLogMonitor() *LogMonitor { @@ -26,6 +40,8 @@ func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { clients: make(map[chan []byte]bool), buffer: ring.New(10 * 1024), // keep 10KB of buffered logs stdout: stdout, + level: LevelInfo, + prefix: "", } } @@ -94,3 +110,77 @@ func (w *LogMonitor) broadcast(msg []byte) { } } } + +func (w *LogMonitor) SetPrefix(prefix string) { + w.mu.Lock() + defer w.mu.Unlock() + w.prefix = prefix +} + +func (w *LogMonitor) SetLogLevel(level LogLevel) { + w.mu.Lock() + defer w.mu.Unlock() + w.level = level +} + +func (w *LogMonitor) formatMessage(level string, msg string) []byte { + prefix := "" + if w.prefix != "" { + prefix = fmt.Sprintf("[%s] ", w.prefix) + } + return []byte(fmt.Sprintf("%s[%s] %s\n", prefix, level, msg)) +} + +func (w *LogMonitor) log(level LogLevel, msg string) { + if level < w.level { + return + } + w.Write(w.formatMessage(level.String(), msg)) +} + +func (w *LogMonitor) Debug(msg string) { + w.log(LevelDebug, msg) +} + +func (w *LogMonitor) Info(msg string) { + w.log(LevelInfo, msg) +} + +func (w *LogMonitor) Warn(msg string) { + w.log(LevelWarn, msg) +} + +func (w *LogMonitor) Error(msg string) { + w.log(LevelError, msg) +} + +func (w *LogMonitor) Debugf(format string, args ...interface{}) { + w.log(LevelDebug, fmt.Sprintf(format, args...)) +} + +func (w *LogMonitor) Infof(format string, args ...interface{}) { + w.log(LevelInfo, fmt.Sprintf(format, args...)) +} + +func (w *LogMonitor) Warnf(format string, args ...interface{}) { + w.log(LevelWarn, fmt.Sprintf(format, args...)) +} + +func (w *LogMonitor) Errorf(format string, args ...interface{}) { + w.log(LevelError, fmt.Sprintf(format, args...)) +} + +func (l LogLevel) String() string { + switch l { + case LevelDebug: + return "DEBUG" + case LevelInfo: + return "INFO" + case LevelWarn: + return "WARN" + case LevelError: + return "ERROR" + default: + return "UNKNOWN" + } +} diff --git a/proxy/process.go b/proxy/process.go index a720906..f21e7ce 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -30,10 +30,12 @@ const ( ) type Process struct { - ID string - config ModelConfig - cmd *exec.Cmd - logMonitor *LogMonitor + ID string + config ModelConfig + cmd *exec.Cmd + + processLogger *LogMonitor + proxyLogger *LogMonitor healthCheckTimeout int healthCheckLoopInterval time.Duration @@ -53,13 +55,14 @@ type Process struct { shutdownCancel context.CancelFunc } -func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, logMonitor *LogMonitor) *Process { +func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process { ctx, cancel := context.WithCancel(context.Background()) return &Process{ ID: ID, config: config, cmd: nil, - logMonitor: logMonitor, + processLogger: processLogger, + proxyLogger: proxyLogger, healthCheckTimeout: healthCheckTimeout, healthCheckLoopInterval: 5 * time.Second, /* default, can not be set by user - used for testing */ state: StateStopped, @@ -68,6 +71,11 @@ func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, logMonito } } +// LogMonitor returns the log monitor associated with the process. +func (p *Process) LogMonitor() *LogMonitor { + return p.processLogger +} + // custom error types for swapping state var ( ErrExpectedStateMismatch = errors.New("expected state mismatch") @@ -85,9 +93,11 @@ func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState, } if !isValidTransition(p.state, newState) { + p.proxyLogger.Warnf("Invalid state transition from %s to %s", p.state, newState) return p.state, ErrInvalidStateTransition } + p.proxyLogger.Debugf("State transition from %s to %s", expectedState, newState) p.state = newState return p.state, nil } @@ -152,8 +162,8 @@ func (p *Process) start() error { defer p.waitStarting.Done() p.cmd = exec.Command(args[0], args[1:]...) - p.cmd.Stdout = p.logMonitor - p.cmd.Stderr = p.logMonitor + p.cmd.Stdout = p.processLogger + p.cmd.Stderr = p.processLogger p.cmd.Env = p.config.Env err = p.cmd.Start() @@ -214,15 +224,16 @@ func (p *Process) start() error { return errors.New("health check interrupted due to shutdown") default: if err := p.checkHealthEndpoint(healthURL); err == nil { + p.proxyLogger.Infof("Health check passed on %s", healthURL) cancelHealthCheck() break loop } else { if strings.Contains(err.Error(), "connection refused") { endTime, _ := checkDeadline.Deadline() ttl := time.Until(endTime) - fmt.Fprintf(p.logMonitor, "!!! Connection refused on %s, ttl %.0fs\n", healthURL, ttl.Seconds()) + p.proxyLogger.Infof("Connection refused on %s, retrying in %.0fs", healthURL, ttl.Seconds()) } else { - fmt.Fprintf(p.logMonitor, "!!! Health check error: %v\n", err) + p.proxyLogger.Infof("Health check error on %s, %v", healthURL, err) } } } @@ -246,7 +257,8 @@ func (p *Process) start() error { p.inFlightRequests.Wait() if time.Since(p.lastRequestHandled) > maxDuration { - fmt.Fprintf(p.logMonitor, "!!! Unloading model %s, TTL of %ds reached.\n", p.ID, p.config.UnloadAfter) + + p.proxyLogger.Infof("Unloading model %s, TTL of %ds reached.", p.ID, p.config.UnloadAfter) p.Stop() return } @@ -267,7 +279,7 @@ func (p *Process) Stop() { // calling Stop() when state is invalid is a no-op if curState, err := p.swapState(StateReady, StateStopping); err != nil { - fmt.Fprintf(p.logMonitor, "!!! Info - Stop() Ready -> StateStopping err: %v, current state: %v\n", err, curState) + p.proxyLogger.Infof("Stop() Ready -> StateStopping err: %v, current state: %v", err, curState) return } @@ -275,7 +287,7 @@ func (p *Process) Stop() { p.stopCommand(5 * time.Second) if curState, err := p.swapState(StateStopping, StateStopped); err != nil { - fmt.Fprintf(p.logMonitor, "!!! Info - Stop() StateStopping -> StateStopped err: %v, current state: %v\n", err, curState) + p.proxyLogger.Infof("Stop() StateStopping -> StateStopped err: %v, current state: %v", err, curState) } } @@ -300,33 +312,32 @@ func (p *Process) stopCommand(sigtermTTL time.Duration) { }() if p.cmd == nil || p.cmd.Process == nil { - fmt.Fprintf(p.logMonitor, "!!! process [%s] cmd or cmd.Process is nil", p.ID) + p.proxyLogger.Warnf("Process [%s] cmd or cmd.Process is nil", p.ID) return } if err := p.terminateProcess(); err != nil { - fmt.Fprintf(p.logMonitor, "!!! failed to gracefully terminate process [%s]: %v\n", p.ID, err) + p.proxyLogger.Infof("Failed to gracefully terminate process [%s]: %v", p.ID, err) } select { case <-sigtermTimeout.Done(): - fmt.Fprintf(p.logMonitor, "!!! process [%s] timed out waiting to stop, sending KILL signal\n", p.ID) + p.proxyLogger.Infof("Process [%s] timed out waiting to stop, sending KILL signal", p.ID) p.cmd.Process.Kill() case err := <-sigtermNormal: if err != nil { if errno, ok := err.(syscall.Errno); ok { - fmt.Fprintf(p.logMonitor, "!!! process [%s] errno >> %v\n", p.ID, errno) + p.proxyLogger.Errorf("Process [%s] errno >> %v", p.ID, errno) } else if exitError, ok := err.(*exec.ExitError); ok { if strings.Contains(exitError.String(), "signal: terminated") { - fmt.Fprintf(p.logMonitor, "!!! process [%s] stopped OK\n", p.ID) + p.proxyLogger.Infof("Process [%s] stopped OK", p.ID) } else if strings.Contains(exitError.String(), "signal: interrupt") { - fmt.Fprintf(p.logMonitor, "!!! process [%s] interrupted OK\n", p.ID) + p.proxyLogger.Infof("Process [%s] interrupted OK", p.ID) } else { - fmt.Fprintf(p.logMonitor, "!!! process [%s] ExitError >> %v, exit code: %d\n", p.ID, exitError, exitError.ExitCode()) + p.proxyLogger.Warnf("Process [%s] ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode()) } - } else { - fmt.Fprintf(p.logMonitor, "!!! process [%s] exited >> %v\n", p.ID, err) + p.proxyLogger.Errorf("Process [%s] exited >> %v", p.ID, err) } } } diff --git a/proxy/process_test.go b/proxy/process_test.go index a42a2aa..f192240 100644 --- a/proxy/process_test.go +++ b/proxy/process_test.go @@ -5,7 +5,6 @@ import ( "io" "net/http" "net/http/httptest" - "os" "sync" "testing" "time" @@ -13,13 +12,17 @@ import ( "github.com/stretchr/testify/assert" ) +var ( + discardLogger = NewLogMonitorWriter(io.Discard) +) + func TestProcess_AutomaticallyStartsUpstream(t *testing.T) { - logMonitor := NewLogMonitorWriter(io.Discard) + expectedMessage := "testing91931" config := getTestSimpleResponderConfig(expectedMessage) // Create a process - process := NewProcess("test-process", 5, config, logMonitor) + process := NewProcess("test-process", 5, config, discardLogger, discardLogger) defer process.Stop() req := httptest.NewRequest("GET", "/test", nil) @@ -52,11 +55,10 @@ func TestProcess_AutomaticallyStartsUpstream(t *testing.T) { // are all handled successfully, even though they all may ask for the process to .start() func TestProcess_WaitOnMultipleStarts(t *testing.T) { - logMonitor := NewLogMonitorWriter(io.Discard) expectedMessage := "testing91931" config := getTestSimpleResponderConfig(expectedMessage) - process := NewProcess("test-process", 5, config, logMonitor) + process := NewProcess("test-process", 5, config, discardLogger, discardLogger) defer process.Stop() var wg sync.WaitGroup @@ -84,7 +86,7 @@ func TestProcess_BrokenModelConfig(t *testing.T) { CheckEndpoint: "/health", } - process := NewProcess("broken", 1, config, NewLogMonitor()) + process := NewProcess("broken", 1, config, discardLogger, discardLogger) req := httptest.NewRequest("GET", "/", nil) w := httptest.NewRecorder() @@ -109,7 +111,7 @@ func TestProcess_UnloadAfterTTL(t *testing.T) { config.UnloadAfter = 3 // seconds assert.Equal(t, 3, config.UnloadAfter) - process := NewProcess("ttl_test", 2, config, NewLogMonitorWriter(io.Discard)) + process := NewProcess("ttl_test", 2, config, discardLogger, discardLogger) defer process.Stop() // this should take 4 seconds @@ -151,7 +153,7 @@ func TestProcess_LowTTLValue(t *testing.T) { config.UnloadAfter = 1 // second assert.Equal(t, 1, config.UnloadAfter) - process := NewProcess("ttl", 2, config, NewLogMonitorWriter(os.Stdout)) + process := NewProcess("ttl", 2, config, discardLogger, discardLogger) defer process.Stop() for i := 0; i < 100; i++ { @@ -178,7 +180,7 @@ func TestProcess_HTTPRequestsHaveTimeToFinish(t *testing.T) { expectedMessage := "12345" config := getTestSimpleResponderConfig(expectedMessage) - process := NewProcess("t", 10, config, NewLogMonitorWriter(os.Stdout)) + process := NewProcess("t", 10, config, discardLogger, discardLogger) defer process.Stop() results := map[string]string{ @@ -255,9 +257,8 @@ func TestProcess_SwapState(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := &Process{ - state: test.currentState, - } + p := NewProcess("test", 10, getTestSimpleResponderConfig("test"), discardLogger, discardLogger) + p.state = test.currentState resultState, err := p.swapState(test.expectedState, test.newState) if err != nil && test.expectedError == nil { @@ -282,7 +283,6 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) { t.Skip("skipping long shutdown test") } - logMonitor := NewLogMonitorWriter(io.Discard) expectedMessage := "testing91931" // make a config where the healthcheck will always fail because port is wrong @@ -290,7 +290,7 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) { config.Proxy = "http://localhost:9998/test" healthCheckTTLSeconds := 30 - process := NewProcess("test-process", healthCheckTTLSeconds, config, logMonitor) + process := NewProcess("test-process", healthCheckTTLSeconds, config, discardLogger, discardLogger) // make it a lot faster process.healthCheckLoopInterval = time.Second diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index ad34a38..f436d70 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -7,6 +7,7 @@ import ( "io" "mime/multipart" "net/http" + "os" "sort" "strconv" "strings" @@ -27,50 +28,76 @@ type ProxyManager struct { config *Config currentProcesses map[string]*Process - logMonitor *LogMonitor ginEngine *gin.Engine + + // logging + proxyLogger *LogMonitor + upstreamLogger *LogMonitor + muxLogger *LogMonitor } func New(config *Config) *ProxyManager { + // set up loggers + stdoutLogger := NewLogMonitorWriter(os.Stdout) + upstreamLogger := NewLogMonitorWriter(stdoutLogger) + proxyLogger := NewLogMonitorWriter(stdoutLogger) + + if config.LogRequests { + proxyLogger.Warn("LogRequests configuration is deprecated. Use logLevel instead.") + } + + switch strings.ToLower(strings.TrimSpace(config.LogLevel)) { + case "debug": + proxyLogger.SetLogLevel(LevelDebug) + case "info": + proxyLogger.SetLogLevel(LevelInfo) + case "warn": + proxyLogger.SetLogLevel(LevelWarn) + case "error": + proxyLogger.SetLogLevel(LevelError) + default: + proxyLogger.SetLogLevel(LevelInfo) + } + pm := &ProxyManager{ config: config, currentProcesses: make(map[string]*Process), - logMonitor: NewLogMonitor(), ginEngine: gin.New(), + + proxyLogger: proxyLogger, + muxLogger: stdoutLogger, + upstreamLogger: upstreamLogger, } - if config.LogRequests { - pm.ginEngine.Use(func(c *gin.Context) { - // Start timer - start := time.Now() + pm.ginEngine.Use(func(c *gin.Context) { + // Start timer + start := time.Now() - // capture these because /upstream/:model rewrites them in c.Next() - clientIP := c.ClientIP() - method := c.Request.Method - path := c.Request.URL.Path + // capture these because /upstream/:model rewrites them in c.Next() + clientIP := c.ClientIP() + method := c.Request.Method + path := c.Request.URL.Path - // Process request - c.Next() + // Process request + c.Next() - // Stop timer - duration := time.Since(start) + // Stop timer + duration := time.Since(start) - statusCode := c.Writer.Status() - bodySize := c.Writer.Size() + statusCode := c.Writer.Status() + bodySize := c.Writer.Size() - fmt.Fprintf(pm.logMonitor, "[llama-swap] %s [%s] \"%s %s %s\" %d %d \"%s\" %v\n", - clientIP, - time.Now().Format("2006-01-02 15:04:05"), - method, - path, - c.Request.Proto, - statusCode, - bodySize, - c.Request.UserAgent(), - duration, - ) - }) - } + pm.proxyLogger.Infof("Request %s \"%s %s %s\" %d %d \"%s\" %v", + clientIP, + method, + path, + c.Request.Proto, + statusCode, + bodySize, + c.Request.UserAgent(), + duration, + ) + }) // see: issue: #81, #77 and #42 for CORS issues // respond with permissive OPTIONS for any endpoint @@ -115,6 +142,8 @@ func New(config *Config) *ProxyManager { pm.ginEngine.GET("/logs", pm.sendLogsHandlers) pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler) pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE) + pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler) + pm.ginEngine.GET("/logs/streamSSE/:logMonitorID", pm.streamLogsHandlerSSE) pm.ginEngine.GET("/upstream", pm.upstreamIndex) pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream) @@ -274,19 +303,20 @@ func (pm *ProxyManager) swapModel(requestedModel string) (*Process, error) { requestedProcessKey := ProcessKeyName(profileName, realModelName) if process, found := pm.currentProcesses[requestedProcessKey]; found { + pm.proxyLogger.Debugf("No-swap, using existing process for model [%s]", requestedModel) return process, nil } // stop all running models + pm.proxyLogger.Infof("Swapping model to [%s]", requestedModel) pm.stopProcesses() - if profileName == "" { modelConfig, modelID, found := pm.config.FindConfig(realModelName) if !found { return nil, fmt.Errorf("could not find configuration for %s", realModelName) } - process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor) + process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.upstreamLogger, pm.proxyLogger) processKey := ProcessKeyName(profileName, modelID) pm.currentProcesses[processKey] = process } else { @@ -297,7 +327,7 @@ func (pm *ProxyManager) swapModel(requestedModel string) (*Process, error) { return nil, fmt.Errorf("could not find configuration for %s in group %s", realModelName, profileName) } - process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor) + process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.upstreamLogger, pm.proxyLogger) processKey := ProcessKeyName(profileName, modelID) pm.currentProcesses[processKey] = process } @@ -385,7 +415,6 @@ func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) { return } } - } c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go index 56efbf6..bc9e4bb 100644 --- a/proxy/proxymanager_loghandlers.go +++ b/proxy/proxymanager_loghandlers.go @@ -9,7 +9,6 @@ import ( ) func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) { - accept := c.GetHeader("Accept") if strings.Contains(accept, "text/html") { // Set the Content-Type header to text/html @@ -28,7 +27,7 @@ func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) { } } else { c.Header("Content-Type", "text/plain") - history := pm.logMonitor.GetHistory() + history := pm.muxLogger.GetHistory() _, err := c.Writer.Write(history) if err != nil { c.AbortWithError(http.StatusInternalServerError, err) @@ -42,8 +41,14 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { c.Header("Transfer-Encoding", "chunked") c.Header("X-Content-Type-Options", "nosniff") - ch := pm.logMonitor.Subscribe() - defer pm.logMonitor.Unsubscribe(ch) + logMonitorId := c.Param("logMonitorID") + logger, err := pm.getLogger(logMonitorId) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + ch := logger.Subscribe() + defer logger.Unsubscribe(ch) notify := c.Request.Context().Done() flusher, ok := c.Writer.(http.Flusher) @@ -56,7 +61,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { // Send history first if not skipped if !skipHistory { - history := pm.logMonitor.GetHistory() + history := logger.GetHistory() if len(history) != 0 { c.Writer.Write(history) flusher.Flush() @@ -85,15 +90,21 @@ func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) { c.Header("Connection", "keep-alive") c.Header("X-Content-Type-Options", "nosniff") - ch := pm.logMonitor.Subscribe() - defer pm.logMonitor.Unsubscribe(ch) + logMonitorId := c.Param("logMonitorID") + logger, err := pm.getLogger(logMonitorId) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + ch := logger.Subscribe() + defer logger.Unsubscribe(ch) notify := c.Request.Context().Done() // Send history first if not skipped _, skipHistory := c.GetQuery("no-history") if !skipHistory { - history := pm.logMonitor.GetHistory() + history := logger.GetHistory() if len(history) != 0 { c.SSEvent("message", string(history)) c.Writer.Flush() @@ -111,3 +122,21 @@ func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) { } } } + +// getLogger searches for the appropriate logger based on the logMonitorId +func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) { + var logger *LogMonitor + + if logMonitorId == "" { + // maintain the default + logger = pm.muxLogger + } else if logMonitorId == "proxy" { + logger = pm.proxyLogger + } else if logMonitorId == "upstream" { + logger = pm.upstreamLogger + } else { + return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'") + } + + return logger, nil +}