use gin for http server
This commit is contained in:
147
proxy/manager.go
147
proxy/manager.go
@@ -8,6 +8,8 @@ import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type ProxyManager struct {
|
||||
@@ -16,65 +18,84 @@ type ProxyManager struct {
|
||||
config *Config
|
||||
currentProcess *Process
|
||||
logMonitor *LogMonitor
|
||||
ginEngine *gin.Engine
|
||||
}
|
||||
|
||||
func New(config *Config) *ProxyManager {
|
||||
return &ProxyManager{config: config, currentProcess: nil, logMonitor: NewLogMonitor()}
|
||||
pm := &ProxyManager{
|
||||
config: config,
|
||||
currentProcess: nil,
|
||||
logMonitor: NewLogMonitor(),
|
||||
ginEngine: gin.New(),
|
||||
}
|
||||
|
||||
// Set up routes using the Gin engine
|
||||
pm.ginEngine.POST("/v1/chat/completions", pm.proxyChatRequestHandler)
|
||||
pm.ginEngine.GET("/v1/models", pm.listModelsHandler)
|
||||
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
|
||||
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
|
||||
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
|
||||
pm.ginEngine.NoRoute(pm.proxyRequestHandler)
|
||||
|
||||
// Disable console color for testing
|
||||
gin.DisableConsoleColor()
|
||||
|
||||
return pm
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) HandleFunc(w http.ResponseWriter, r *http.Request) {
|
||||
func (pm *ProxyManager) HandlerFunc(w http.ResponseWriter, r *http.Request) {
|
||||
pm.ginEngine.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// https://github.com/ggerganov/llama.cpp/blob/master/examples/server/README.md#api-endpoints
|
||||
if r.URL.Path == "/v1/chat/completions" {
|
||||
// extracts the `model` from json body
|
||||
pm.proxyChatRequest(w, r)
|
||||
} else if r.URL.Path == "/v1/models" {
|
||||
pm.listModels(w, r)
|
||||
} else if r.URL.Path == "/logs" {
|
||||
pm.streamLogs(w, r)
|
||||
} else {
|
||||
if pm.currentProcess != nil {
|
||||
pm.currentProcess.ProxyRequest(w, r)
|
||||
} else {
|
||||
http.Error(w, "no strategy to handle request", http.StatusBadRequest)
|
||||
}
|
||||
func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/plain")
|
||||
history := pm.logMonitor.GetHistory()
|
||||
_, err := c.Writer.Write(history)
|
||||
if err != nil {
|
||||
c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||||
func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/plain")
|
||||
c.Header("Transfer-Encoding", "chunked")
|
||||
c.Header("X-Content-Type-Options", "nosniff")
|
||||
|
||||
ch := pm.logMonitor.Subscribe()
|
||||
defer pm.logMonitor.Unsubscribe(ch)
|
||||
|
||||
notify := r.Context().Done()
|
||||
flusher, ok := w.(http.Flusher)
|
||||
notify := c.Request.Context().Done()
|
||||
flusher, ok := c.Writer.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
|
||||
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Streaming unsupported"))
|
||||
return
|
||||
}
|
||||
|
||||
skipHistory := r.URL.Query().Has("skip")
|
||||
_, skipHistory := c.GetQuery("no-history")
|
||||
// Send history first if not skipped
|
||||
|
||||
if !skipHistory {
|
||||
// Send history first
|
||||
history := pm.logMonitor.GetHistory()
|
||||
if len(history) != 0 {
|
||||
w.Write(history)
|
||||
_, err := c.Writer.Write(history)
|
||||
if err != nil {
|
||||
c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
if !r.URL.Query().Has("stream") {
|
||||
return
|
||||
}
|
||||
|
||||
// Stream new logs
|
||||
for {
|
||||
select {
|
||||
case msg := <-ch:
|
||||
w.Write(msg)
|
||||
_, err := c.Writer.Write(msg)
|
||||
if err != nil {
|
||||
c.AbortWithError(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
case <-notify:
|
||||
return
|
||||
@@ -82,7 +103,39 @@ func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) listModels(w http.ResponseWriter, _ *http.Request) {
|
||||
func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
c.Header("Connection", "keep-alive")
|
||||
c.Header("X-Content-Type-Options", "nosniff")
|
||||
|
||||
ch := pm.logMonitor.Subscribe()
|
||||
defer pm.logMonitor.Unsubscribe(ch)
|
||||
|
||||
notify := c.Request.Context().Done()
|
||||
|
||||
// Send history first if not skipped
|
||||
_, skipHistory := c.GetQuery("skip")
|
||||
if !skipHistory {
|
||||
history := pm.logMonitor.GetHistory()
|
||||
if len(history) != 0 {
|
||||
c.SSEvent("message", string(history))
|
||||
c.Writer.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// Stream new logs
|
||||
for {
|
||||
select {
|
||||
case msg := <-ch:
|
||||
c.SSEvent("message", string(msg))
|
||||
c.Writer.Flush()
|
||||
case <-notify:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
func (pm *ProxyManager) listModelsHandler(c *gin.Context) {
|
||||
data := []interface{}{}
|
||||
for id := range pm.config.Models {
|
||||
data = append(data, map[string]interface{}{
|
||||
@@ -94,11 +147,11 @@ func (pm *ProxyManager) listModels(w http.ResponseWriter, _ *http.Request) {
|
||||
}
|
||||
|
||||
// Set the Content-Type header to application/json
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
c.Header("Content-Type", "application/json")
|
||||
|
||||
// Encode the data as JSON and write it to the response writer
|
||||
if err := json.NewEncoder(w).Encode(map[string]interface{}{"data": data}); err != nil {
|
||||
http.Error(w, "Error encoding JSON", http.StatusInternalServerError)
|
||||
if err := json.NewEncoder(c.Writer).Encode(map[string]interface{}{"data": data}); err != nil {
|
||||
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("Error encoding JSON"))
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -126,28 +179,36 @@ func (pm *ProxyManager) swapModel(requestedModel string) error {
|
||||
return pm.currentProcess.Start(pm.config.HealthCheckTimeout)
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) proxyChatRequest(w http.ResponseWriter, r *http.Request) {
|
||||
bodyBytes, err := io.ReadAll(r.Body)
|
||||
func (pm *ProxyManager) proxyChatRequestHandler(c *gin.Context) {
|
||||
bodyBytes, err := io.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
http.Error(w, "Invalid JSON", http.StatusBadRequest)
|
||||
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Invalid JSON"))
|
||||
return
|
||||
}
|
||||
var requestBody map[string]interface{}
|
||||
if err := json.Unmarshal(bodyBytes, &requestBody); err != nil {
|
||||
http.Error(w, "Invalid JSON", http.StatusBadRequest)
|
||||
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Invalid JSON"))
|
||||
return
|
||||
}
|
||||
model, ok := requestBody["model"].(string)
|
||||
if !ok {
|
||||
http.Error(w, "Missing or invalid 'model' key", http.StatusBadRequest)
|
||||
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("Missing or invalid 'model' key"))
|
||||
return
|
||||
}
|
||||
|
||||
if err := pm.swapModel(model); err != nil {
|
||||
http.Error(w, fmt.Sprintf("unable to swap to model, %s", err.Error()), http.StatusNotFound)
|
||||
c.AbortWithError(http.StatusNotFound, fmt.Errorf("unable to swap to model, %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
pm.currentProcess.ProxyRequest(w, r)
|
||||
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||
pm.currentProcess.ProxyRequest(c.Writer, c.Request)
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) proxyRequestHandler(c *gin.Context) {
|
||||
if pm.currentProcess != nil {
|
||||
pm.currentProcess.ProxyRequest(c.Writer, c.Request)
|
||||
} else {
|
||||
c.AbortWithError(http.StatusBadRequest, fmt.Errorf("no strategy to handle request"))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user