middleware/proxy: Kill goroutines on stop (#646)

* middleware/proxy: Kill goroutines on stop

Ports caddy's 59bf71c293

Excludes the proxy_test.go test part though.

Fixes #644

* Add tests
This commit is contained in:
Miek Gieben 2017-04-26 10:58:14 +01:00 committed by GitHub
parent 003b1bf678
commit 3b5b6a233f
5 changed files with 112 additions and 5 deletions

View file

@ -21,6 +21,8 @@ func NewLookup(hosts []string) Proxy {
func NewLookupWithOption(hosts []string, opts Options) Proxy { func NewLookupWithOption(hosts []string, opts Options) Proxy {
p := Proxy{Next: nil} p := Proxy{Next: nil}
// TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost
// we should copy/make something similar.
upstream := &staticUpstream{ upstream := &staticUpstream{
from: ".", from: ".",
Hosts: make([]*UpstreamHost, len(hosts)), Hosts: make([]*UpstreamHost, len(hosts)),

View file

@ -46,6 +46,8 @@ type Upstream interface {
IsAllowedDomain(string) bool IsAllowedDomain(string) bool
// Exchanger returns the exchanger to be used for this upstream. // Exchanger returns the exchanger to be used for this upstream.
Exchanger() Exchanger Exchanger() Exchanger
// Stops the upstream from proxying requests to shutdown goroutines cleanly.
Stop() error
} }
// UpstreamHostDownFunc can be used to customize how Down behaves. // UpstreamHostDownFunc can be used to customize how Down behaves.

View file

@ -0,0 +1,85 @@
package proxy
import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/mholt/caddy/caddyfile"
)
func TestStop(t *testing.T) {
config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}"
tests := []struct {
name string
intervalInMilliseconds int
numHealthcheckIntervals int
}{
{
"No Healthchecks After Stop - 5ms, 1 intervals",
5,
1,
},
{
"No Healthchecks After Stop - 5ms, 2 intervals",
5,
2,
},
{
"No Healthchecks After Stop - 5ms, 3 intervals",
5,
3,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Set up proxy.
var counter int64
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r.Body.Close()
atomic.AddInt64(&counter, 1)
}))
defer backend.Close()
port := backend.URL[17:] // Remove all crap up to the port
back := backend.URL[7:] // Remove http://
c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds)))
upstreams, err := NewStaticUpstreams(&c)
if err != nil {
t.Error("Expected no error. Got:", err.Error())
}
// Give some time for healthchecks to hit the server.
time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
for _, upstream := range upstreams {
if err := upstream.Stop(); err != nil {
t.Error("Expected no error stopping upstream. Got: ", err.Error())
}
}
counterValueAfterShutdown := atomic.LoadInt64(&counter)
// Give some time to see if healthchecks are still hitting the server.
time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
if counterValueAfterShutdown == 0 {
t.Error("Expected healthchecks to hit test server. Got no healthchecks.")
}
counterValueAfterWaiting := atomic.LoadInt64(&counter)
if counterValueAfterWaiting != counterValueAfterShutdown {
t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown)
}
})
}
}

View file

@ -37,6 +37,8 @@ func setup(c *caddy.Controller) error {
c.OnShutdown(func() error { c.OnShutdown(func() error {
return u.Exchanger().OnShutdown(P) return u.Exchanger().OnShutdown(P)
}) })
// Register shutdown handlers.
c.OnShutdown(u.Stop)
} }
return nil return nil

View file

@ -10,6 +10,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -25,7 +26,10 @@ var (
) )
type staticUpstream struct { type staticUpstream struct {
from string from string
stop chan struct{} // Signals running goroutines to stop.
wg sync.WaitGroup // Used to wait for running goroutines to stop.
Hosts HostPool Hosts HostPool
Policy Policy Policy Policy
Spray Policy Spray Policy
@ -49,6 +53,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
for c.Next() { for c.Next() {
upstream := &staticUpstream{ upstream := &staticUpstream{
from: ".", from: ".",
stop: make(chan struct{}),
Hosts: nil, Hosts: nil,
Policy: &Random{}, Policy: &Random{},
Spray: nil, Spray: nil,
@ -108,13 +113,25 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
} }
if upstream.HealthCheck.Path != "" { if upstream.HealthCheck.Path != "" {
go upstream.HealthCheckWorker(nil) upstream.wg.Add(1)
go func() {
defer upstream.wg.Done()
upstream.HealthCheckWorker(upstream.stop)
}()
} }
upstreams = append(upstreams, upstream) upstreams = append(upstreams, upstream)
} }
return upstreams, nil return upstreams, nil
} }
// Stop sends a signal to all goroutines started by this staticUpstream to exit
// and waits for them to finish before returning.
func (u *staticUpstream) Stop() error {
close(u.stop)
u.wg.Wait()
return nil
}
// RegisterPolicy adds a custom policy to the proxy. // RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) { func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy supportedPolicies[name] = policy
@ -281,9 +298,8 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) {
case <-ticker.C: case <-ticker.C:
u.healthCheck() u.healthCheck()
case <-stop: case <-stop:
// TODO: the library should provide a stop channel and global ticker.Stop()
// waitgroup to allow goroutines started by plugins a chance return
// to clean themselves up.
} }
} }
} }