* proxy/config: create config package and migrate configuration The configuration is become more complex as llama-swap adds more advanced features. This commit moves config to its own package so it can be developed independently of the proxy package. Additionally, enforcing a public API for a configuration will allow downstream usage to be more decoupled.
150 lines
3.2 KiB
Go
150 lines
3.2 KiB
Go
package proxy
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"slices"
|
|
"sync"
|
|
|
|
"github.com/mostlygeek/llama-swap/proxy/config"
|
|
)
|
|
|
|
type ProcessGroup struct {
|
|
sync.Mutex
|
|
|
|
config config.Config
|
|
id string
|
|
swap bool
|
|
exclusive bool
|
|
persistent bool
|
|
|
|
proxyLogger *LogMonitor
|
|
upstreamLogger *LogMonitor
|
|
|
|
// map of current processes
|
|
processes map[string]*Process
|
|
lastUsedProcess string
|
|
}
|
|
|
|
func NewProcessGroup(id string, config config.Config, proxyLogger *LogMonitor, upstreamLogger *LogMonitor) *ProcessGroup {
|
|
groupConfig, ok := config.Groups[id]
|
|
if !ok {
|
|
panic("Unable to find configuration for group id: " + id)
|
|
}
|
|
|
|
pg := &ProcessGroup{
|
|
id: id,
|
|
config: config,
|
|
swap: groupConfig.Swap,
|
|
exclusive: groupConfig.Exclusive,
|
|
persistent: groupConfig.Persistent,
|
|
proxyLogger: proxyLogger,
|
|
upstreamLogger: upstreamLogger,
|
|
processes: make(map[string]*Process),
|
|
}
|
|
|
|
// Create a Process for each member in the group
|
|
for _, modelID := range groupConfig.Members {
|
|
modelConfig, modelID, _ := pg.config.FindConfig(modelID)
|
|
process := NewProcess(modelID, pg.config.HealthCheckTimeout, modelConfig, pg.upstreamLogger, pg.proxyLogger)
|
|
pg.processes[modelID] = process
|
|
}
|
|
|
|
return pg
|
|
}
|
|
|
|
// ProxyRequest proxies a request to the specified model
|
|
func (pg *ProcessGroup) ProxyRequest(modelID string, writer http.ResponseWriter, request *http.Request) error {
|
|
if !pg.HasMember(modelID) {
|
|
return fmt.Errorf("model %s not part of group %s", modelID, pg.id)
|
|
}
|
|
|
|
if pg.swap {
|
|
pg.Lock()
|
|
if pg.lastUsedProcess != modelID {
|
|
|
|
// is there something already running?
|
|
if pg.lastUsedProcess != "" {
|
|
pg.processes[pg.lastUsedProcess].Stop()
|
|
}
|
|
|
|
// wait for the request to the new model to be fully handled
|
|
// and prevent race conditions see issue #277
|
|
pg.processes[modelID].ProxyRequest(writer, request)
|
|
pg.lastUsedProcess = modelID
|
|
|
|
// short circuit and exit
|
|
pg.Unlock()
|
|
return nil
|
|
}
|
|
pg.Unlock()
|
|
}
|
|
|
|
pg.processes[modelID].ProxyRequest(writer, request)
|
|
return nil
|
|
}
|
|
|
|
func (pg *ProcessGroup) HasMember(modelName string) bool {
|
|
return slices.Contains(pg.config.Groups[pg.id].Members, modelName)
|
|
}
|
|
|
|
func (pg *ProcessGroup) StopProcess(modelID string, strategy StopStrategy) error {
|
|
pg.Lock()
|
|
|
|
process, exists := pg.processes[modelID]
|
|
if !exists {
|
|
pg.Unlock()
|
|
return fmt.Errorf("process not found for %s", modelID)
|
|
}
|
|
|
|
if pg.lastUsedProcess == modelID {
|
|
pg.lastUsedProcess = ""
|
|
}
|
|
pg.Unlock()
|
|
|
|
switch strategy {
|
|
case StopImmediately:
|
|
process.StopImmediately()
|
|
default:
|
|
process.Stop()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pg *ProcessGroup) StopProcesses(strategy StopStrategy) {
|
|
pg.Lock()
|
|
defer pg.Unlock()
|
|
|
|
if len(pg.processes) == 0 {
|
|
return
|
|
}
|
|
|
|
// stop Processes in parallel
|
|
var wg sync.WaitGroup
|
|
for _, process := range pg.processes {
|
|
wg.Add(1)
|
|
go func(process *Process) {
|
|
defer wg.Done()
|
|
switch strategy {
|
|
case StopImmediately:
|
|
process.StopImmediately()
|
|
default:
|
|
process.Stop()
|
|
}
|
|
}(process)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (pg *ProcessGroup) Shutdown() {
|
|
var wg sync.WaitGroup
|
|
for _, process := range pg.processes {
|
|
wg.Add(1)
|
|
go func(process *Process) {
|
|
defer wg.Done()
|
|
process.Shutdown()
|
|
}(process)
|
|
}
|
|
wg.Wait()
|
|
}
|