Refactor log implementation
- use []byte instead of unnecessary string conversions - make LogManager.Broadcast private - make LogManager.GetHistory public - add tests
This commit is contained in:
3
Makefile
3
Makefile
@@ -9,6 +9,9 @@ all: mac linux simple-responder
|
|||||||
clean:
|
clean:
|
||||||
rm -rf $(BUILD_DIR)
|
rm -rf $(BUILD_DIR)
|
||||||
|
|
||||||
|
test:
|
||||||
|
go test -v ./proxy
|
||||||
|
|
||||||
# Build OSX binary
|
# Build OSX binary
|
||||||
mac:
|
mac:
|
||||||
@echo "Building Mac binary..."
|
@echo "Building Mac binary..."
|
||||||
|
|||||||
@@ -2,66 +2,73 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/ring"
|
"container/ring"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LogMonitor struct {
|
type LogMonitor struct {
|
||||||
clients map[chan string]bool
|
clients map[chan []byte]bool
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
buffer *ring.Ring
|
buffer *ring.Ring
|
||||||
bufferMu sync.RWMutex
|
bufferMu sync.RWMutex
|
||||||
|
|
||||||
|
// typically this can be os.Stdout
|
||||||
|
stdout io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogMonitor() *LogMonitor {
|
func NewLogMonitor() *LogMonitor {
|
||||||
|
return NewLogMonitorWriter(os.Stdout)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
|
||||||
return &LogMonitor{
|
return &LogMonitor{
|
||||||
clients: make(map[chan string]bool),
|
clients: make(map[chan []byte]bool),
|
||||||
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
|
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
|
||||||
|
stdout: stdout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) Write(p []byte) (n int, err error) {
|
func (w *LogMonitor) Write(p []byte) (n int, err error) {
|
||||||
n, err = os.Stdout.Write(p)
|
n, err = w.stdout.Write(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
content := string(p)
|
|
||||||
|
|
||||||
w.bufferMu.Lock()
|
w.bufferMu.Lock()
|
||||||
w.buffer.Value = content
|
w.buffer.Value = p
|
||||||
w.buffer = w.buffer.Next()
|
w.buffer = w.buffer.Next()
|
||||||
w.bufferMu.Unlock()
|
w.bufferMu.Unlock()
|
||||||
|
|
||||||
w.Broadcast(content)
|
w.broadcast(p)
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) getHistory() string {
|
func (w *LogMonitor) GetHistory() []byte {
|
||||||
w.bufferMu.RLock()
|
w.bufferMu.RLock()
|
||||||
defer w.bufferMu.RUnlock()
|
defer w.bufferMu.RUnlock()
|
||||||
|
|
||||||
var history string
|
var history []byte
|
||||||
w.buffer.Do(func(p interface{}) {
|
w.buffer.Do(func(p interface{}) {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
if content, ok := p.(string); ok {
|
if content, ok := p.([]byte); ok {
|
||||||
history += content
|
history = append(history, content...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return history
|
return history
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) Subscribe() chan string {
|
func (w *LogMonitor) Subscribe() chan []byte {
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
defer w.mu.Unlock()
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
ch := make(chan string, 100)
|
ch := make(chan []byte, 100)
|
||||||
w.clients[ch] = true
|
w.clients[ch] = true
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) Unsubscribe(ch chan string) {
|
func (w *LogMonitor) Unsubscribe(ch chan []byte) {
|
||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
defer w.mu.Unlock()
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
@@ -69,7 +76,7 @@ func (w *LogMonitor) Unsubscribe(ch chan string) {
|
|||||||
close(ch)
|
close(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *LogMonitor) Broadcast(msg string) {
|
func (w *LogMonitor) broadcast(msg []byte) {
|
||||||
w.mu.RLock()
|
w.mu.RLock()
|
||||||
defer w.mu.RUnlock()
|
defer w.mu.RUnlock()
|
||||||
|
|
||||||
|
|||||||
63
proxy/logMonitor_test.go
Normal file
63
proxy/logMonitor_test.go
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLogMonitor(t *testing.T) {
|
||||||
|
logMonitor := NewLogMonitorWriter(io.Discard)
|
||||||
|
|
||||||
|
// Test subscription
|
||||||
|
client1 := logMonitor.Subscribe()
|
||||||
|
client2 := logMonitor.Subscribe()
|
||||||
|
|
||||||
|
defer logMonitor.Unsubscribe(client1)
|
||||||
|
defer logMonitor.Unsubscribe(client2)
|
||||||
|
|
||||||
|
client1Messages := make([]byte, 0)
|
||||||
|
client2Messages := make([]byte, 0)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case data := <-client1:
|
||||||
|
client1Messages = append(client1Messages, data...)
|
||||||
|
case data := <-client2:
|
||||||
|
client2Messages = append(client2Messages, data...)
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
logMonitor.Write([]byte("1"))
|
||||||
|
logMonitor.Write([]byte("2"))
|
||||||
|
logMonitor.Write([]byte("3"))
|
||||||
|
|
||||||
|
// Wait for the goroutine to finish
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Check the buffer
|
||||||
|
expectedHistory := "123"
|
||||||
|
history := string(logMonitor.GetHistory())
|
||||||
|
|
||||||
|
if history != expectedHistory {
|
||||||
|
t.Errorf("Expected history: %s, got: %s", expectedHistory, history)
|
||||||
|
}
|
||||||
|
|
||||||
|
c1Data := string(client1Messages)
|
||||||
|
if c1Data != expectedHistory {
|
||||||
|
t.Errorf("Client1 expected %s, got: %s", expectedHistory, c1Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
c2Data := string(client2Messages)
|
||||||
|
if c2Data != expectedHistory {
|
||||||
|
t.Errorf("Client2 expected %s, got: %s", expectedHistory, c2Data)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -61,9 +61,9 @@ func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) {
|
|||||||
skipHistory := r.URL.Query().Has("skip")
|
skipHistory := r.URL.Query().Has("skip")
|
||||||
if !skipHistory {
|
if !skipHistory {
|
||||||
// Send history first
|
// Send history first
|
||||||
history := pm.logMonitor.getHistory()
|
history := pm.logMonitor.GetHistory()
|
||||||
if history != "" {
|
if len(history) != 0 {
|
||||||
fmt.Fprint(w, history)
|
w.Write(history)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -76,7 +76,7 @@ func (pm *ProxyManager) streamLogs(w http.ResponseWriter, r *http.Request) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-ch:
|
case msg := <-ch:
|
||||||
fmt.Fprint(w, msg)
|
w.Write(msg)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
case <-notify:
|
case <-notify:
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user