From a929b0b1ecbb0f87b0bd0cd0a40ce38893dce9c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Benkovsk=C3=BD?= Date: Wed, 13 Apr 2022 19:09:03 +0200 Subject: [PATCH] plugin/health : rework overloaded goroutine to support graceful shutdown (#5244) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ondřej Benkovský --- plugin/health/health.go | 13 ++++++---- plugin/health/health_test.go | 4 ++-- plugin/health/overloaded.go | 15 ++++++++---- plugin/health/overloaded_test.go | 41 ++++++++++++++++++++++++++++++++ plugin/health/setup.go | 2 +- 5 files changed, 63 insertions(+), 12 deletions(-) create mode 100644 plugin/health/overloaded_test.go diff --git a/plugin/health/health.go b/plugin/health/health.go index b5b4b95a2..a71c00c8c 100644 --- a/plugin/health/health.go +++ b/plugin/health/health.go @@ -2,6 +2,7 @@ package health import ( + "context" "io" "net" "net/http" @@ -22,14 +23,13 @@ type health struct { nlSetup bool mux *http.ServeMux - stop chan bool + stop context.CancelFunc } func (h *health) OnStartup() error { if h.Addr == "" { h.Addr = ":8080" } - h.stop = make(chan bool) ln, err := reuseport.Listen("tcp", h.Addr) if err != nil { return err @@ -45,8 +45,11 @@ func (h *health) OnStartup() error { io.WriteString(w, http.StatusText(http.StatusOK)) }) + ctx := context.Background() + ctx, h.stop = context.WithCancel(ctx) + go func() { http.Serve(h.ln, h.mux) }() - go func() { h.overloaded() }() + go func() { h.overloaded(ctx) }() return nil } @@ -61,9 +64,9 @@ func (h *health) OnFinalShutdown() error { time.Sleep(h.lameduck) } - h.ln.Close() + h.stop() + h.ln.Close() h.nlSetup = false - close(h.stop) return nil } diff --git a/plugin/health/health_test.go b/plugin/health/health_test.go index 81440ec86..ba6b14c7a 100644 --- a/plugin/health/health_test.go +++ b/plugin/health/health_test.go @@ -9,7 +9,7 @@ import ( ) func TestHealth(t *testing.T) { - h := &health{Addr: ":0", stop: make(chan bool)} + h := &health{Addr: ":0"} if err := h.OnStartup(); err != nil { t.Fatalf("Unable to startup the health server: %v", err) @@ -37,7 +37,7 @@ func TestHealth(t *testing.T) { } func TestHealthLameduck(t *testing.T) { - h := &health{Addr: ":0", stop: make(chan bool), lameduck: 250 * time.Millisecond} + h := &health{Addr: ":0", lameduck: 250 * time.Millisecond} if err := h.OnStartup(); err != nil { t.Fatalf("Unable to startup the health server: %v", err) diff --git a/plugin/health/overloaded.go b/plugin/health/overloaded.go index 88ce038c3..482b8a286 100644 --- a/plugin/health/overloaded.go +++ b/plugin/health/overloaded.go @@ -1,6 +1,7 @@ package health import ( + "context" "net/http" "time" @@ -11,12 +12,14 @@ import ( ) // overloaded queries the health end point and updates a metrics showing how long it took. -func (h *health) overloaded() { - timeout := time.Duration(3 * time.Second) +func (h *health) overloaded(ctx context.Context) { + timeout := 3 * time.Second client := http.Client{ Timeout: timeout, } + url := "http://" + h.Addr + "/health" + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) tick := time.NewTicker(1 * time.Second) defer tick.Stop() @@ -24,7 +27,11 @@ func (h *health) overloaded() { select { case <-tick.C: start := time.Now() - resp, err := client.Get(url) + resp, err := client.Do(req) + if err != nil && ctx.Err() == context.Canceled { + // request was cancelled by parent goroutine + return + } if err != nil { HealthDuration.Observe(time.Since(start).Seconds()) HealthFailures.Inc() @@ -38,7 +45,7 @@ func (h *health) overloaded() { log.Warningf("Local health request to %q took more than 1s: %s", url, elapsed) } - case <-h.stop: + case <-ctx.Done(): return } } diff --git a/plugin/health/overloaded_test.go b/plugin/health/overloaded_test.go new file mode 100644 index 000000000..c927f13b2 --- /dev/null +++ b/plugin/health/overloaded_test.go @@ -0,0 +1,41 @@ +package health + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func Test_health_overloaded_cancellation(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(1 * time.Second) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + h := &health{ + Addr: ts.URL, + stop: cancel, + } + + stopped := make(chan struct{}) + go func() { + h.overloaded(ctx) + stopped <- struct{}{} + }() + + // wait for overloaded function to start atleast once + time.Sleep(1 * time.Second) + + cancel() + + select { + case <-stopped: + case <-time.After(5 * time.Second): + t.Fatal("overloaded function should have been cancelled") + } +} diff --git a/plugin/health/setup.go b/plugin/health/setup.go index 9cabeb7e6..624d88dd5 100644 --- a/plugin/health/setup.go +++ b/plugin/health/setup.go @@ -17,7 +17,7 @@ func setup(c *caddy.Controller) error { return plugin.Error("health", err) } - h := &health{Addr: addr, stop: make(chan bool), lameduck: lame} + h := &health{Addr: addr, lameduck: lame} c.OnStartup(h.OnStartup) c.OnRestart(h.OnFinalShutdown)