package proxy import ( "fmt" "net/http" "slices" "sync" "github.com/mostlygeek/llama-swap/proxy/config" ) type ProcessGroup struct { sync.Mutex config config.Config id string swap bool exclusive bool persistent bool proxyLogger *LogMonitor upstreamLogger *LogMonitor // map of current processes processes map[string]*Process lastUsedProcess string } func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, upstreamLogger *LogMonitor) *ProcessGroup { groupConfig, ok := config.Groups[id] if !ok { panic("Unable to find configuration for group id: " + id) } pg := &ProcessGroup{ id: id, config: config, swap: groupConfig.Swap, exclusive: groupConfig.Exclusive, persistent: groupConfig.Persistent, proxyLogger: proxyLogger, upstreamLogger: upstreamLogger, processes: make(map[string]*Process), } // Create a Process for each member in the group for _, modelID := range groupConfig.Members { modelConfig, modelID, _ := pg.config.FindConfig(modelID) process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger) pg.processes[modelID] = process } return pg } // ProxyRequest proxies a request to the specified model func (pg *ProcessGroup) ProxyRequest(modelID string, writer http.ResponseWriter, request *http.Request) error { if !pg.HasMember(modelID) { return fmt.Errorf("model %s not part of group %s", modelID, pg.id) } if pg.swap { pg.Lock() if pg.lastUsedProcess != modelID { // is there something already running? if pg.lastUsedProcess != "" { pg.processes[pg.lastUsedProcess].Stop() } // wait for the request to the new model to be fully handled // and prevent race conditions see issue #277 pg.processes[modelID].ProxyRequest(writer, request) pg.lastUsedProcess = modelID // short circuit and exit pg.Unlock() return nil } pg.Unlock() } pg.processes[modelID].ProxyRequest(writer, request) return nil } func (pg *ProcessGroup) HasMember(modelName string) bool { return slices.Contains(pg.config.Groups[pg.id].Members, modelName) } func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error { pg.Lock() process, exists := pg.processes[modelID] if !exists { pg.Unlock() return fmt.Errorf("process not found for %s", modelID) } if pg.lastUsedProcess == modelID { pg.lastUsedProcess = "" } pg.Unlock() switch strategy { case StopImmediately: process.StopImmediately() default: process.Stop() } return nil } func (pg *ProcessGroup) StopProcesses(strategy StopStrategy) { pg.Lock() defer pg.Unlock() if len(pg.processes) == 0 { return } // stop Processes in parallel var wg sync.WaitGroup for _, process := range pg.processes { wg.Add(1) go func(process *Process) { defer wg.Done() switch strategy { case StopImmediately: process.StopImmediately() default: process.Stop() } }(process) } wg.Wait() } func (pg *ProcessGroup) Shutdown() { var wg sync.WaitGroup for _, process := range pg.processes { wg.Add(1) go func(process *Process) { defer wg.Done() process.Shutdown() }(process) } wg.Wait() }