From 6299c1b87461264f50999161da0272158008c0ac Mon Sep 17 00:00:00 2001 From: Benson Wong Date: Tue, 15 Jul 2025 18:04:30 -0700 Subject: [PATCH] Fix High CPU (#189) * vendor in kelindar/event lib and refactor to remove time.Ticker --- event/README.md | 3 + event/default.go | 30 ++++ event/default_test.go | 54 +++++++ event/event.go | 324 ++++++++++++++++++++++++++++++++++++++ event/event_test.go | 324 ++++++++++++++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 4 - llama-swap.go | 2 +- proxy/logMonitor.go | 4 +- proxy/process.go | 2 +- proxy/proxymanager_api.go | 2 +- 11 files changed, 741 insertions(+), 11 deletions(-) create mode 100644 event/README.md create mode 100644 event/default.go create mode 100644 event/default_test.go create mode 100644 event/event.go create mode 100644 event/event_test.go diff --git a/event/README.md b/event/README.md new file mode 100644 index 0000000..d19467b --- /dev/null +++ b/event/README.md @@ -0,0 +1,3 @@ +The code in `event` was originally a part of https://github.com/kelindar/event (v1.5.2) + +The original code uses a `time.Ticker` to process the event queue which caused a large increase in CPU usage ([#189](https://github.com/mostlygeek/llama-swap/issues/189)). This code was ported to remove the ticker and instead be more event driven. diff --git a/event/default.go b/event/default.go new file mode 100644 index 0000000..e38be03 --- /dev/null +++ b/event/default.go @@ -0,0 +1,30 @@ +// Copyright (c) Roman Atachiants and contributore. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for detaile. + +package event + +import ( + "context" +) + +// Default initializes a default in-process dispatcher +var Default = NewDispatcherConfig(25000) + +// On subscribes to an event, the type of the event will be automatically +// inferred from the provided type. Must be constant for this to work. This +// functions same way as Subscribe() but uses the default dispatcher instead. +func On[T Event](handler func(T)) context.CancelFunc { + return Subscribe(Default, handler) +} + +// OnType subscribes to an event with the specified event type. This functions +// same way as SubscribeTo() but uses the default dispatcher instead. +func OnType[T Event](eventType uint32, handler func(T)) context.CancelFunc { + return SubscribeTo(Default, eventType, handler) +} + +// Emit writes an event into the dispatcher. This functions same way as +// Publish() but uses the default dispatcher instead. +func Emit[T Event](ev T) { + Publish(Default, ev) +} diff --git a/event/default_test.go b/event/default_test.go new file mode 100644 index 0000000..ded5a2e --- /dev/null +++ b/event/default_test.go @@ -0,0 +1,54 @@ +// Copyright (c) Roman Atachiants and contributore. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for detaile. + +package event + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +/* +cpu: 13th Gen Intel(R) Core(TM) i7-13700K +BenchmarkSubcribeConcurrent-24 1826686 606.3 ns/op 1648 B/op 5 allocs/op +*/ +func BenchmarkSubscribeConcurrent(b *testing.B) { + d := NewDispatcher() + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + unsub := Subscribe(d, func(ev MyEvent1) {}) + unsub() + } + }) +} + +func TestDefaultPublish(t *testing.T) { + var wg sync.WaitGroup + + // Subscribe + var count int64 + defer On(func(ev MyEvent1) { + atomic.AddInt64(&count, 1) + wg.Done() + })() + + defer OnType(TypeEvent1, func(ev MyEvent1) { + atomic.AddInt64(&count, 1) + wg.Done() + })() + + // Publish + wg.Add(4) + Emit(MyEvent1{}) + Emit(MyEvent1{}) + + // Wait and check + wg.Wait() + assert.Equal(t, int64(4), count) +} diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..6846314 --- /dev/null +++ b/event/event.go @@ -0,0 +1,324 @@ +// Copyright (c) Roman Atachiants and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package event + +import ( + "context" + "fmt" + "reflect" + "sort" + "strings" + "sync" + "sync/atomic" +) + +// Event represents an event contract +type Event interface { + Type() uint32 +} + +// registry holds an immutable sorted array of event mappings +type registry struct { + keys []uint32 // Event types (sorted) + grps []any // Corresponding subscribers +} + +// ------------------------------------- Dispatcher ------------------------------------- + +// Dispatcher represents an event dispatcher. +type Dispatcher struct { + subs atomic.Pointer[registry] // Atomic pointer to immutable array + done chan struct{} // Cancellation + maxQueue int // Maximum queue size per consumer + mu sync.Mutex // Only for writes (subscribe/unsubscribe) +} + +// NewDispatcher creates a new dispatcher of events. +func NewDispatcher() *Dispatcher { + return NewDispatcherConfig(50000) +} + +// NewDispatcherConfig creates a new dispatcher with configurable max queue size +func NewDispatcherConfig(maxQueue int) *Dispatcher { + d := &Dispatcher{ + done: make(chan struct{}), + maxQueue: maxQueue, + } + + d.subs.Store(®istry{ + keys: make([]uint32, 0, 16), + grps: make([]any, 0, 16), + }) + return d +} + +// Close closes the dispatcher +func (d *Dispatcher) Close() error { + close(d.done) + return nil +} + +// isClosed returns whether the dispatcher is closed or not +func (d *Dispatcher) isClosed() bool { + select { + case <-d.done: + return true + default: + return false + } +} + +// findGroup performs a lock-free binary search for the event type +func (d *Dispatcher) findGroup(eventType uint32) any { + reg := d.subs.Load() + keys := reg.keys + + // Inlined binary search for better cache locality + left, right := 0, len(keys) + for left < right { + mid := left + (right-left)/2 + if keys[mid] < eventType { + left = mid + 1 + } else { + right = mid + } + } + + if left < len(keys) && keys[left] == eventType { + return reg.grps[left] + } + return nil +} + +// Subscribe subscribes to an event, the type of the event will be automatically +// inferred from the provided type. Must be constant for this to work. +func Subscribe[T Event](broker *Dispatcher, handler func(T)) context.CancelFunc { + var event T + return SubscribeTo(broker, event.Type(), handler) +} + +// SubscribeTo subscribes to an event with the specified event type. +func SubscribeTo[T Event](broker *Dispatcher, eventType uint32, handler func(T)) context.CancelFunc { + if broker.isClosed() { + panic(errClosed) + } + + broker.mu.Lock() + defer broker.mu.Unlock() + + // Check if group already exists + if existing := broker.findGroup(eventType); existing != nil { + grp := groupOf[T](eventType, existing) + sub := grp.Add(handler) + return func() { + grp.Del(sub) + } + } + + // Create new group + grp := &group[T]{cond: sync.NewCond(new(sync.Mutex)), maxQueue: broker.maxQueue} + sub := grp.Add(handler) + + // Copy-on-write: insert new entry in sorted position + old := broker.subs.Load() + idx := sort.Search(len(old.keys), func(i int) bool { + return old.keys[i] >= eventType + }) + + // Create new arrays with space for one more element + newKeys := make([]uint32, len(old.keys)+1) + newGrps := make([]any, len(old.grps)+1) + + // Copy elements before insertion point + copy(newKeys[:idx], old.keys[:idx]) + copy(newGrps[:idx], old.grps[:idx]) + + // Insert new element + newKeys[idx] = eventType + newGrps[idx] = grp + + // Copy elements after insertion point + copy(newKeys[idx+1:], old.keys[idx:]) + copy(newGrps[idx+1:], old.grps[idx:]) + + // Atomically store the new registry (mutex ensures no concurrent writers) + newReg := ®istry{keys: newKeys, grps: newGrps} + broker.subs.Store(newReg) + + return func() { + grp.Del(sub) + } +} + +// Publish writes an event into the dispatcher +func Publish[T Event](broker *Dispatcher, ev T) { + eventType := ev.Type() + if sub := broker.findGroup(eventType); sub != nil { + group := groupOf[T](eventType, sub) + group.Broadcast(ev) + } +} + +// Count counts the number of subscribers, this is for testing only. +func (d *Dispatcher) count(eventType uint32) int { + if group := d.findGroup(eventType); group != nil { + return group.(interface{ Count() int }).Count() + } + return 0 +} + +// groupOf casts the subscriber group to the specified generic type +func groupOf[T Event](eventType uint32, subs any) *group[T] { + if group, ok := subs.(*group[T]); ok { + return group + } + + panic(errConflict[T](eventType, subs)) +} + +// ------------------------------------- Subscriber ------------------------------------- + +// consumer represents a consumer with a message queue +type consumer[T Event] struct { + queue []T // Current work queue + stop bool // Stop signal +} + +// Listen listens to the event queue and processes events +func (s *consumer[T]) Listen(c *sync.Cond, fn func(T)) { + pending := make([]T, 0, 128) + + for { + c.L.Lock() + for len(s.queue) == 0 { + switch { + case s.stop: + c.L.Unlock() + return + default: + c.Wait() + } + } + + // Swap buffers and reset the current queue + temp := s.queue + s.queue = pending[:0] + pending = temp + c.L.Unlock() + + // Outside of the critical section, process the work + for _, event := range pending { + fn(event) + } + + // Notify potential publishers waiting due to backpressure + c.Broadcast() + } +} + +// ------------------------------------- Subscriber Group ------------------------------------- + +// group represents a consumer group +type group[T Event] struct { + cond *sync.Cond + subs []*consumer[T] + maxQueue int // Maximum queue size per consumer + maxLen int // Current maximum queue length across all consumers +} + +// Broadcast sends an event to all consumers +func (s *group[T]) Broadcast(ev T) { + s.cond.L.Lock() + defer s.cond.L.Unlock() + + // Calculate current maximum queue length + s.maxLen = 0 + for _, sub := range s.subs { + if len(sub.queue) > s.maxLen { + s.maxLen = len(sub.queue) + } + } + + // Backpressure: wait if queues are full + for s.maxLen >= s.maxQueue { + s.cond.Wait() + + // Recalculate after wakeup + s.maxLen = 0 + for _, sub := range s.subs { + if len(sub.queue) > s.maxLen { + s.maxLen = len(sub.queue) + } + } + } + + // Add event to all queues and track new maximum + newMax := 0 + for _, sub := range s.subs { + sub.queue = append(sub.queue, ev) + if len(sub.queue) > newMax { + newMax = len(sub.queue) + } + } + s.maxLen = newMax + s.cond.Broadcast() // Wake consumers +} + +// Add adds a subscriber to the list +func (s *group[T]) Add(handler func(T)) *consumer[T] { + sub := &consumer[T]{ + queue: make([]T, 0, 64), + } + + // Add the consumer to the list of active consumers + s.cond.L.Lock() + s.subs = append(s.subs, sub) + s.cond.L.Unlock() + + // Start listening + go sub.Listen(s.cond, handler) + return sub +} + +// Del removes a subscriber from the list +func (s *group[T]) Del(sub *consumer[T]) { + s.cond.L.Lock() + defer s.cond.L.Unlock() + + // Search and remove the subscriber + sub.stop = true + for i, v := range s.subs { + if v == sub { + copy(s.subs[i:], s.subs[i+1:]) + s.subs = s.subs[:len(s.subs)-1] + break + } + } +} + +// ------------------------------------- Debugging ------------------------------------- + +var errClosed = fmt.Errorf("event dispatcher is closed") + +// Count returns the number of subscribers in this group +func (s *group[T]) Count() int { + return len(s.subs) +} + +// String returns string representation of the type +func (s *group[T]) String() string { + typ := reflect.TypeOf(s).String() + idx := strings.LastIndex(typ, "/") + typ = typ[idx+1 : len(typ)-1] + return typ +} + +// errConflict returns a conflict message +func errConflict[T any](eventType uint32, existing any) string { + var want T + return fmt.Sprintf( + "conflicting event type, want=<%T>, registered=<%s>, event=0x%v", + want, existing, eventType, + ) +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 0000000..1868ef0 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,324 @@ +// Copyright (c) Roman Atachiants and contributore. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for detaile. + +package event + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPublish(t *testing.T) { + d := NewDispatcher() + var wg sync.WaitGroup + + // Subscribe, must be received in order + var count int64 + defer Subscribe(d, func(ev MyEvent1) { + assert.Equal(t, int(atomic.AddInt64(&count, 1)), ev.Number) + wg.Done() + })() + + // Publish + wg.Add(3) + Publish(d, MyEvent1{Number: 1}) + Publish(d, MyEvent1{Number: 2}) + Publish(d, MyEvent1{Number: 3}) + + // Wait and check + wg.Wait() + assert.Equal(t, int64(3), count) +} + +func TestUnsubscribe(t *testing.T) { + d := NewDispatcher() + assert.Equal(t, 0, d.count(TypeEvent1)) + unsubscribe := Subscribe(d, func(ev MyEvent1) { + // Nothing + }) + + assert.Equal(t, 1, d.count(TypeEvent1)) + unsubscribe() + assert.Equal(t, 0, d.count(TypeEvent1)) +} + +func TestConcurrent(t *testing.T) { + const max = 1000000 + var count int64 + var wg sync.WaitGroup + wg.Add(1) + + d := NewDispatcher() + defer Subscribe(d, func(ev MyEvent1) { + if current := atomic.AddInt64(&count, 1); current == max { + wg.Done() + } + })() + + // Asynchronously publish + go func() { + for i := 0; i < max; i++ { + Publish(d, MyEvent1{}) + } + }() + + defer Subscribe(d, func(ev MyEvent1) { + // Subscriber that does nothing + })() + + wg.Wait() + assert.Equal(t, max, int(count)) +} + +func TestSubscribeDifferentType(t *testing.T) { + d := NewDispatcher() + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent1) {}) + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + }) +} + +func TestPublishDifferentType(t *testing.T) { + d := NewDispatcher() + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + Publish(d, MyEvent1{}) + }) +} + +func TestCloseDispatcher(t *testing.T) { + d := NewDispatcher() + defer SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {})() + + assert.NoError(t, d.Close()) + assert.Panics(t, func() { + SubscribeTo(d, TypeEvent1, func(ev MyEvent2) {}) + }) +} + +func TestMatrix(t *testing.T) { + const amount = 1000 + for _, subs := range []int{1, 10, 100} { + for _, topics := range []int{1, 10} { + expected := subs * topics * amount + t.Run(fmt.Sprintf("%dx%d", topics, subs), func(t *testing.T) { + var count atomic.Int64 + var wg sync.WaitGroup + wg.Add(expected) + + d := NewDispatcher() + for i := 0; i < subs; i++ { + for id := 0; id < topics; id++ { + defer SubscribeTo(d, uint32(id), func(ev MyEvent3) { + count.Add(1) + wg.Done() + })() + } + } + + for n := 0; n < amount; n++ { + for id := 0; id < topics; id++ { + go Publish(d, MyEvent3{ID: id}) + } + } + + wg.Wait() + assert.Equal(t, expected, int(count.Load())) + }) + } + } +} + +func TestConcurrentSubscriptionRace(t *testing.T) { + // This test specifically targets the race condition that occurs when multiple + // goroutines try to subscribe to different event types simultaneously. + // Without the CAS loop, subscriptions could be lost due to registry corruption. + + const numGoroutines = 100 + const numEventTypes = 50 + + d := NewDispatcher() + defer d.Close() + + var wg sync.WaitGroup + var receivedCount int64 + var subscribedTypes sync.Map // Thread-safe map + + wg.Add(numGoroutines) + + // Start multiple goroutines that subscribe to different event types concurrently + for i := 0; i < numGoroutines; i++ { + go func(goroutineID int) { + defer wg.Done() + + // Each goroutine subscribes to a unique event type + eventType := uint32(goroutineID%numEventTypes + 1000) // Offset to avoid collision with other tests + + // Subscribe to the event type + SubscribeTo(d, eventType, func(ev MyEvent3) { + atomic.AddInt64(&receivedCount, 1) + }) + + // Record that this type was subscribed + subscribedTypes.Store(eventType, true) + }(i) + } + + // Wait for all subscriptions to complete + wg.Wait() + + // Count the number of unique event types subscribed + expectedTypes := 0 + subscribedTypes.Range(func(key, value interface{}) bool { + expectedTypes++ + return true + }) + + // Small delay to ensure all subscriptions are fully processed + time.Sleep(10 * time.Millisecond) + + // Publish events to each subscribed type + subscribedTypes.Range(func(key, value interface{}) bool { + eventType := key.(uint32) + Publish(d, MyEvent3{ID: int(eventType)}) + return true + }) + + // Wait for all events to be processed + time.Sleep(50 * time.Millisecond) + + // Verify that we received at least the expected number of events + // (there might be more if multiple goroutines subscribed to the same event type) + received := atomic.LoadInt64(&receivedCount) + assert.GreaterOrEqual(t, int(received), expectedTypes, + "Should have received at least %d events, got %d", expectedTypes, received) + + // Verify that we have the expected number of unique event types + assert.Equal(t, numEventTypes, expectedTypes, + "Should have exactly %d unique event types", numEventTypes) +} + +func TestConcurrentHandlerRegistration(t *testing.T) { + const numGoroutines = 100 + + // Test concurrent subscriptions to the same event type + t.Run("SameEventType", func(t *testing.T) { + d := NewDispatcher() + var handlerCount int64 + var wg sync.WaitGroup + + // Start multiple goroutines subscribing to the same event type (0x1) + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + SubscribeTo(d, uint32(0x1), func(ev MyEvent1) { + atomic.AddInt64(&handlerCount, 1) + }) + }() + } + + wg.Wait() + + // Verify all handlers were registered by publishing an event + atomic.StoreInt64(&handlerCount, 0) + Publish(d, MyEvent1{}) + + // Small delay to ensure all handlers have executed + time.Sleep(10 * time.Millisecond) + + assert.Equal(t, int64(numGoroutines), atomic.LoadInt64(&handlerCount), + "Not all handlers were registered due to race condition") + }) + + // Test concurrent subscriptions to different event types + t.Run("DifferentEventTypes", func(t *testing.T) { + d := NewDispatcher() + var wg sync.WaitGroup + receivedEvents := make(map[uint32]*int64) + + // Create multiple event types and subscribe concurrently + for i := 0; i < numGoroutines; i++ { + eventType := uint32(100 + i) + counter := new(int64) + receivedEvents[eventType] = counter + + wg.Add(1) + go func(et uint32, cnt *int64) { + defer wg.Done() + SubscribeTo(d, et, func(ev MyEvent3) { + atomic.AddInt64(cnt, 1) + }) + }(eventType, counter) + } + + wg.Wait() + + // Publish events to all types + for eventType := uint32(100); eventType < uint32(100+numGoroutines); eventType++ { + Publish(d, MyEvent3{ID: int(eventType)}) + } + + // Small delay to ensure all handlers have executed + time.Sleep(10 * time.Millisecond) + + // Verify all event types received their events + for eventType, counter := range receivedEvents { + assert.Equal(t, int64(1), atomic.LoadInt64(counter), + "Event type %d did not receive its event", eventType) + } + }) +} + +func TestBackpressure(t *testing.T) { + d := NewDispatcher() + d.maxQueue = 10 + + var processedCount int64 + unsub := SubscribeTo(d, uint32(0x200), func(ev MyEvent3) { + atomic.AddInt64(&processedCount, 1) + }) + defer unsub() + + const eventsToPublish = 1000 + for i := 0; i < eventsToPublish; i++ { + Publish(d, MyEvent3{ID: 0x200}) + } + + time.Sleep(100 * time.Millisecond) + + // Verify all events were eventually processed + finalProcessed := atomic.LoadInt64(&processedCount) + assert.Equal(t, int64(eventsToPublish), finalProcessed) + t.Logf("Events processed: %d/%d", finalProcessed, eventsToPublish) +} + +// ------------------------------------- Test Events ------------------------------------- + +const ( + TypeEvent1 = 0x1 + TypeEvent2 = 0x2 +) + +type MyEvent1 struct { + Number int +} + +func (t MyEvent1) Type() uint32 { return TypeEvent1 } + +type MyEvent2 struct { + Text string +} + +func (t MyEvent2) Type() uint32 { return TypeEvent2 } + +type MyEvent3 struct { + ID int +} + +func (t MyEvent3) Type() uint32 { return uint32(t.ID) } diff --git a/go.mod b/go.mod index f4e708c..6321114 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,16 @@ module github.com/mostlygeek/llama-swap go 1.23.0 require ( + github.com/billziss-gh/golib v0.2.0 github.com/fsnotify/fsnotify v1.9.0 github.com/gin-gonic/gin v1.10.0 github.com/stretchr/testify v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 gopkg.in/yaml.v3 v3.0.1 - github.com/kelindar/event v1.5.2 ) require ( - github.com/billziss-gh/golib v0.2.0 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect diff --git a/go.sum b/go.sum index 92518f5..d1561ea 100644 --- a/go.sum +++ b/go.sum @@ -32,12 +32,8 @@ github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= -github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= -github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY= -github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= diff --git a/llama-swap.go b/llama-swap.go index ae5d0fa..7861dd5 100644 --- a/llama-swap.go +++ b/llama-swap.go @@ -14,7 +14,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/gin-gonic/gin" - "github.com/kelindar/event" + "github.com/mostlygeek/llama-swap/event" "github.com/mostlygeek/llama-swap/proxy" ) diff --git a/proxy/logMonitor.go b/proxy/logMonitor.go index f67208e..034b2ba 100644 --- a/proxy/logMonitor.go +++ b/proxy/logMonitor.go @@ -8,7 +8,7 @@ import ( "os" "sync" - "github.com/kelindar/event" + "github.com/mostlygeek/llama-swap/event" ) type LogLevel int @@ -40,7 +40,7 @@ func NewLogMonitor() *LogMonitor { func NewLogMonitorWriter(stdout io.Writer) *LogMonitor { return &LogMonitor{ - eventbus: event.NewDispatcher(), + eventbus: event.NewDispatcherConfig(1000), buffer: ring.New(10 * 1024), // keep 10KB of buffered logs stdout: stdout, level: LevelInfo, diff --git a/proxy/process.go b/proxy/process.go index e42524a..9707a07 100644 --- a/proxy/process.go +++ b/proxy/process.go @@ -14,7 +14,7 @@ import ( "syscall" "time" - "github.com/kelindar/event" + "github.com/mostlygeek/llama-swap/event" ) type ProcessState string diff --git a/proxy/proxymanager_api.go b/proxy/proxymanager_api.go index f2b30bd..991e8e2 100644 --- a/proxy/proxymanager_api.go +++ b/proxy/proxymanager_api.go @@ -7,7 +7,7 @@ import ( "sort" "github.com/gin-gonic/gin" - "github.com/kelindar/event" + "github.com/mostlygeek/llama-swap/event" ) type Model struct {