cmd/wol-proxy: tweak logs to show what is causing wake ups (#356)
fix the extra wake ups being caused by wol-proxy * cmd/wol-proxy: tweak logs to show what is causing wake ups * cmd/wol-proxy: add skip wakeup * cmd/wol-proxy: replace ticker with SSE connection * cmd/wol-proxy: increase scanner buffer size * cmd/wol-proxy: improve failure tracking
This commit is contained in:
2
Makefile
2
Makefile
@@ -90,7 +90,7 @@ GOOS ?= $(shell go env GOOS 2>/dev/null || echo linux)
|
||||
GOARCH ?= $(shell go env GOARCH 2>/dev/null || echo amd64)
|
||||
wol-proxy: $(BUILD_DIR)
|
||||
@echo "Building wol-proxy"
|
||||
go build -o $(BUILD_DIR)/wol-proxy-$(GOOS)-$(GOARCH) cmd/wol-proxy/wol-proxy.go
|
||||
go build -o $(BUILD_DIR)/wol-proxy-$(GOOS)-$(GOARCH)-$(shell date +%Y-%m-%d) cmd/wol-proxy/wol-proxy.go
|
||||
|
||||
# Phony targets
|
||||
.PHONY: all clean ui mac linux windows simple-responder simple-responder-windows test test-all test-dev wol-proxy
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
@@ -92,14 +94,9 @@ func main() {
|
||||
}()
|
||||
|
||||
// graceful shutdown
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer stop()
|
||||
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
<-ctx.Done()
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := server.Shutdown(shutdownCtx); err != nil {
|
||||
slog.Error("server shutdown error", "error", err)
|
||||
}
|
||||
server.Close()
|
||||
}
|
||||
|
||||
type upstreamStatus string
|
||||
@@ -124,37 +121,86 @@ func newProxy(url *url.URL) *proxyServer {
|
||||
failCount: 0,
|
||||
}
|
||||
|
||||
// start a goroutien to check upstream status
|
||||
// start a goroutine to monitor upstream status via SSE
|
||||
go func() {
|
||||
checkUrl := url.Scheme + "://" + url.Host + "/wol-health"
|
||||
client := &http.Client{Timeout: time.Second}
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
eventsUrl := url.Scheme + "://" + url.Host + "/api/events"
|
||||
client := &http.Client{
|
||||
Timeout: 0, // No timeout for SSE connection
|
||||
}
|
||||
|
||||
slog.Debug("checking upstream status at", "url", checkUrl)
|
||||
resp, err := client.Get(checkUrl)
|
||||
waitDuration := 10 * time.Second
|
||||
|
||||
// drain the body
|
||||
if err == nil && resp != nil {
|
||||
for {
|
||||
slog.Debug("connecting to SSE endpoint", "url", eventsUrl)
|
||||
|
||||
req, err := http.NewRequest("GET", eventsUrl, nil)
|
||||
if err != nil {
|
||||
slog.Warn("failed to create SSE request", "error", err)
|
||||
proxy.setStatus(notready)
|
||||
proxy.incFail(1)
|
||||
time.Sleep(waitDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
req.Header.Set("Accept", "text/event-stream")
|
||||
req.Header.Set("Cache-Control", "no-cache")
|
||||
req.Header.Set("Connection", "keep-alive")
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
slog.Error("failed to connect to SSE endpoint", "error", err)
|
||||
proxy.setStatus(notready)
|
||||
proxy.incFail(1)
|
||||
time.Sleep(10 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
slog.Warn("SSE endpoint returned non-OK status", "status", resp.StatusCode)
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
|
||||
if err == nil && resp != nil && resp.StatusCode == http.StatusOK {
|
||||
slog.Debug("upstream status: ready")
|
||||
proxy.setStatus(ready)
|
||||
proxy.statusMutex.Lock()
|
||||
proxy.failCount = 0
|
||||
proxy.statusMutex.Unlock()
|
||||
} else {
|
||||
slog.Debug("upstream status: notready", "error", err)
|
||||
proxy.setStatus(notready)
|
||||
proxy.statusMutex.Lock()
|
||||
proxy.failCount++
|
||||
proxy.statusMutex.Unlock()
|
||||
proxy.incFail(1)
|
||||
time.Sleep(10 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// Successfully connected to SSE endpoint
|
||||
slog.Info("connected to SSE endpoint, upstream ready")
|
||||
proxy.setStatus(ready)
|
||||
proxy.resetFailures()
|
||||
|
||||
// Read from the SSE stream to detect disconnection
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
|
||||
// use a fairly large buffer to avoid scanner errors when reading large SSE events
|
||||
buf := make([]byte, 0, 1024*1024*2)
|
||||
scanner.Buffer(buf, 1024*1024*2)
|
||||
events := 0
|
||||
if slog.Default().Enabled(context.Background(), slog.LevelDebug) {
|
||||
fmt.Print("Events: ")
|
||||
}
|
||||
for scanner.Scan() {
|
||||
if slog.Default().Enabled(context.Background(), slog.LevelDebug) {
|
||||
// Just read the events to keep connection alive
|
||||
// We don't need to process the event data
|
||||
events++
|
||||
fmt.Printf("%d, ", events)
|
||||
}
|
||||
}
|
||||
fmt.Println()
|
||||
if err := scanner.Err(); err != nil {
|
||||
slog.Error("error reading from SSE stream", "error", err)
|
||||
}
|
||||
|
||||
// Connection closed or error occurred
|
||||
_ = resp.Body.Close()
|
||||
slog.Info("SSE connection closed, upstream not ready")
|
||||
proxy.setStatus(notready)
|
||||
proxy.incFail(1)
|
||||
|
||||
// Wait before reconnecting
|
||||
time.Sleep(waitDuration)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -163,10 +209,8 @@ func newProxy(url *url.URL) *proxyServer {
|
||||
|
||||
func (p *proxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == "GET" && r.URL.Path == "/status" {
|
||||
p.statusMutex.RLock()
|
||||
status := string(p.status)
|
||||
failCount := p.failCount
|
||||
p.statusMutex.RUnlock()
|
||||
status := string(p.getStatus())
|
||||
failCount := p.getFailures()
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.WriteHeader(200)
|
||||
fmt.Fprintf(w, "status: %s\n", status)
|
||||
@@ -175,7 +219,14 @@ func (p *proxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
if p.getStatus() == notready {
|
||||
slog.Info("upstream not ready, sending magic packet", "mac", *flagMac)
|
||||
path := r.URL.Path
|
||||
if strings.HasPrefix(path, "/api/events") {
|
||||
slog.Debug("Skipping wake up", "req", path)
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("upstream not ready, sending magic packet", "req", path, "from", r.RemoteAddr)
|
||||
if err := sendMagicPacket(*flagMac); err != nil {
|
||||
slog.Warn("failed to send magic WoL packet", "error", err)
|
||||
}
|
||||
@@ -213,6 +264,24 @@ func (p *proxyServer) setStatus(status upstreamStatus) {
|
||||
p.status = status
|
||||
}
|
||||
|
||||
func (p *proxyServer) incFail(num int) {
|
||||
p.statusMutex.Lock()
|
||||
defer p.statusMutex.Unlock()
|
||||
p.failCount += num
|
||||
}
|
||||
|
||||
func (p *proxyServer) getFailures() int {
|
||||
p.statusMutex.RLock()
|
||||
defer p.statusMutex.RUnlock()
|
||||
return p.failCount
|
||||
}
|
||||
|
||||
func (p *proxyServer) resetFailures() {
|
||||
p.statusMutex.Lock()
|
||||
defer p.statusMutex.Unlock()
|
||||
p.failCount = 0
|
||||
}
|
||||
|
||||
func sendMagicPacket(macAddr string) error {
|
||||
hwAddr, err := net.ParseMAC(macAddr)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user