diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go index e97741fb5..370307cc1 100644 --- a/middleware/proxy/lookup.go +++ b/middleware/proxy/lookup.go @@ -21,6 +21,8 @@ func NewLookup(hosts []string) Proxy { func NewLookupWithOption(hosts []string, opts Options) Proxy { p := Proxy{Next: nil} + // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost + // we should copy/make something similar. upstream := &staticUpstream{ from: ".", Hosts: make([]*UpstreamHost, len(hosts)), diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go index ce8b99d83..5205bd06f 100644 --- a/middleware/proxy/proxy.go +++ b/middleware/proxy/proxy.go @@ -46,6 +46,8 @@ type Upstream interface { IsAllowedDomain(string) bool // Exchanger returns the exchanger to be used for this upstream. Exchanger() Exchanger + // Stops the upstream from proxying requests to shutdown goroutines cleanly. + Stop() error } // UpstreamHostDownFunc can be used to customize how Down behaves. diff --git a/middleware/proxy/proxy_test.go b/middleware/proxy/proxy_test.go new file mode 100644 index 000000000..0a44d1b82 --- /dev/null +++ b/middleware/proxy/proxy_test.go @@ -0,0 +1,85 @@ +package proxy + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/mholt/caddy/caddyfile" +) + +func TestStop(t *testing.T) { + config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}" + tests := []struct { + name string + intervalInMilliseconds int + numHealthcheckIntervals int + }{ + { + "No Healthchecks After Stop - 5ms, 1 intervals", + 5, + 1, + }, + { + "No Healthchecks After Stop - 5ms, 2 intervals", + 5, + 2, + }, + { + "No Healthchecks After Stop - 5ms, 3 intervals", + 5, + 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + // Set up proxy. + var counter int64 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + atomic.AddInt64(&counter, 1) + })) + + defer backend.Close() + + port := backend.URL[17:] // Remove all crap up to the port + back := backend.URL[7:] // Remove http:// + c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds))) + upstreams, err := NewStaticUpstreams(&c) + if err != nil { + t.Error("Expected no error. Got:", err.Error()) + } + + // Give some time for healthchecks to hit the server. + time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) + + for _, upstream := range upstreams { + if err := upstream.Stop(); err != nil { + t.Error("Expected no error stopping upstream. Got: ", err.Error()) + } + } + + counterValueAfterShutdown := atomic.LoadInt64(&counter) + + // Give some time to see if healthchecks are still hitting the server. + time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) + + if counterValueAfterShutdown == 0 { + t.Error("Expected healthchecks to hit test server. Got no healthchecks.") + } + + counterValueAfterWaiting := atomic.LoadInt64(&counter) + if counterValueAfterWaiting != counterValueAfterShutdown { + t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown) + } + + }) + + } +} diff --git a/middleware/proxy/setup.go b/middleware/proxy/setup.go index 36401188f..de979a5df 100644 --- a/middleware/proxy/setup.go +++ b/middleware/proxy/setup.go @@ -37,6 +37,8 @@ func setup(c *caddy.Controller) error { c.OnShutdown(func() error { return u.Exchanger().OnShutdown(P) }) + // Register shutdown handlers. + c.OnShutdown(u.Stop) } return nil diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 278b08d0f..8bc3e5306 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -10,6 +10,7 @@ import ( "net/url" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -25,7 +26,10 @@ var ( ) type staticUpstream struct { - from string + from string + stop chan struct{} // Signals running goroutines to stop. + wg sync.WaitGroup // Used to wait for running goroutines to stop. + Hosts HostPool Policy Policy Spray Policy @@ -49,6 +53,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { for c.Next() { upstream := &staticUpstream{ from: ".", + stop: make(chan struct{}), Hosts: nil, Policy: &Random{}, Spray: nil, @@ -108,13 +113,25 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } if upstream.HealthCheck.Path != "" { - go upstream.HealthCheckWorker(nil) + upstream.wg.Add(1) + go func() { + defer upstream.wg.Done() + upstream.HealthCheckWorker(upstream.stop) + }() } upstreams = append(upstreams, upstream) } return upstreams, nil } +// Stop sends a signal to all goroutines started by this staticUpstream to exit +// and waits for them to finish before returning. +func (u *staticUpstream) Stop() error { + close(u.stop) + u.wg.Wait() + return nil +} + // RegisterPolicy adds a custom policy to the proxy. func RegisterPolicy(name string, policy func() Policy) { supportedPolicies[name] = policy @@ -281,9 +298,8 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { case <-ticker.C: u.healthCheck() case <-stop: - // TODO: the library should provide a stop channel and global - // waitgroup to allow goroutines started by plugins a chance - // to clean themselves up. + ticker.Stop() + return } } }