diff --git a/proxy/proxymanager.go b/proxy/proxymanager.go index 1c566f3..f219a8f 100644 --- a/proxy/proxymanager.go +++ b/proxy/proxymanager.go @@ -32,9 +32,12 @@ func New(config *Config) *ProxyManager { // Set up routes using the Gin engine pm.ginEngine.POST("/v1/chat/completions", pm.proxyChatRequestHandler) pm.ginEngine.GET("/v1/models", pm.listModelsHandler) + + // in proxymanager_loghandlers.go pm.ginEngine.GET("/logs", pm.sendLogsHandlers) pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler) pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE) + pm.ginEngine.NoRoute(pm.proxyRequestHandler) // Disable console color for testing @@ -47,94 +50,6 @@ func (pm *ProxyManager) HandlerFunc(w http.ResponseWriter, r *http.Request) { pm.ginEngine.ServeHTTP(w, r) } -func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) { - c.Header("Content-Type", "text/plain") - history := pm.logMonitor.GetHistory() - _, err := c.Writer.Write(history) - if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) - return - } -} - -func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { - c.Header("Content-Type", "text/plain") - c.Header("Transfer-Encoding", "chunked") - c.Header("X-Content-Type-Options", "nosniff") - - ch := pm.logMonitor.Subscribe() - defer pm.logMonitor.Unsubscribe(ch) - - notify := c.Request.Context().Done() - flusher, ok := c.Writer.(http.Flusher) - if !ok { - c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Streaming unsupported")) - return - } - - _, skipHistory := c.GetQuery("no-history") - // Send history first if not skipped - - if !skipHistory { - history := pm.logMonitor.GetHistory() - if len(history) != 0 { - _, err := c.Writer.Write(history) - if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) - return - } - flusher.Flush() - } - } - - // Stream new logs - for { - select { - case msg := <-ch: - _, err := c.Writer.Write(msg) - if err != nil { - c.AbortWithError(http.StatusInternalServerError, err) - return - } - flusher.Flush() - case <-notify: - return - } - } -} - -func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) { - c.Header("Content-Type", "text/event-stream") - c.Header("Cache-Control", "no-cache") - c.Header("Connection", "keep-alive") - c.Header("X-Content-Type-Options", "nosniff") - - ch := pm.logMonitor.Subscribe() - defer pm.logMonitor.Unsubscribe(ch) - - notify := c.Request.Context().Done() - - // Send history first if not skipped - _, skipHistory := c.GetQuery("skip") - if !skipHistory { - history := pm.logMonitor.GetHistory() - if len(history) != 0 { - c.SSEvent("message", string(history)) - c.Writer.Flush() - } - } - - // Stream new logs - for { - select { - case msg := <-ch: - c.SSEvent("message", string(msg)) - c.Writer.Flush() - case <-notify: - return - } - } -} func (pm *ProxyManager) listModelsHandler(c *gin.Context) { data := []interface{}{} for id := range pm.config.Models { @@ -151,7 +66,7 @@ func (pm *ProxyManager) listModelsHandler(c *gin.Context) { // Encode the data as JSON and write it to the response writer if err := json.NewEncoder(c.Writer).Encode(map[string]interface{}{"data": data}); err != nil { - c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Error encoding JSON")) + c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("error encoding JSON")) return } } @@ -182,17 +97,17 @@ func (pm *ProxyManager) swapModel(requestedModel string) error { func (pm *ProxyManager) proxyChatRequestHandler(c *gin.Context) { bodyBytes, err := io.ReadAll(c.Request.Body) if err != nil { - c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Invalid JSON")) + c.AbortWithError(http.StatusBadRequest, fmt.Errorf("invalid JSON")) return } var requestBody map[string]interface{} if err := json.Unmarshal(bodyBytes, &requestBody); err != nil { - c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Invalid JSON")) + c.AbortWithError(http.StatusBadRequest, fmt.Errorf("invalid JSON")) return } model, ok := requestBody["model"].(string) if !ok { - c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Missing or invalid 'model' key")) + c.AbortWithError(http.StatusBadRequest, fmt.Errorf("missing or invalid 'model' key")) return } diff --git a/proxy/proxymanager_loghandlers.go b/proxy/proxymanager_loghandlers.go new file mode 100644 index 0000000..c82d56f --- /dev/null +++ b/proxy/proxymanager_loghandlers.go @@ -0,0 +1,97 @@ +package proxy + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" +) + +func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) { + c.Header("Content-Type", "text/plain") + history := pm.logMonitor.GetHistory() + _, err := c.Writer.Write(history) + if err != nil { + c.AbortWithError(http.StatusInternalServerError, err) + return + } +} + +func (pm *ProxyManager) streamLogsHandler(c *gin.Context) { + c.Header("Content-Type", "text/plain") + c.Header("Transfer-Encoding", "chunked") + c.Header("X-Content-Type-Options", "nosniff") + + ch := pm.logMonitor.Subscribe() + defer pm.logMonitor.Unsubscribe(ch) + + notify := c.Request.Context().Done() + flusher, ok := c.Writer.(http.Flusher) + if !ok { + c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Streaming unsupported")) + return + } + + _, skipHistory := c.GetQuery("no-history") + // Send history first if not skipped + + if !skipHistory { + history := pm.logMonitor.GetHistory() + if len(history) != 0 { + _, err := c.Writer.Write(history) + if err != nil { + c.AbortWithError(http.StatusInternalServerError, err) + return + } + flusher.Flush() + } + } + + // Stream new logs + for { + select { + case msg := <-ch: + _, err := c.Writer.Write(msg) + if err != nil { + c.AbortWithError(http.StatusInternalServerError, err) + return + } + flusher.Flush() + case <-notify: + return + } + } +} + +func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("X-Content-Type-Options", "nosniff") + + ch := pm.logMonitor.Subscribe() + defer pm.logMonitor.Unsubscribe(ch) + + notify := c.Request.Context().Done() + + // Send history first if not skipped + _, skipHistory := c.GetQuery("skip") + if !skipHistory { + history := pm.logMonitor.GetHistory() + if len(history) != 0 { + c.SSEvent("message", string(history)) + c.Writer.Flush() + } + } + + // Stream new logs + for { + select { + case msg := <-ch: + c.SSEvent("message", string(msg)) + c.Writer.Flush() + case <-notify: + return + } + } +}