From bb05a665eb4e23e0d04ae816624f0eda230e655a Mon Sep 17 00:00:00 2001 From: ghostflame Date: Fri, 30 Jun 2017 10:13:45 +0100 Subject: [PATCH] middleware/proxy: async health checks (#749) * Switches out Unhealthy bool for OkUntil timestamp * Make sure servers are healthy forever if there are no health checks * Moves health check off into a go routine to avoid blocking conditions * Improved logging info * Fixes initial date * Fixes health checking; alters tests to adapt to async health checking * Moves future variable into static upstream and populates it in more places * Restores silencing of stdout during testing * Restores silencing of stdout during testing * keeps check url string once built * Removes debug message * uses zero value to signal no checking; reduces in-mutex code to a fetch --- middleware/proxy/README.md | 6 +- middleware/proxy/google.go | 18 ++-- middleware/proxy/grpc_test.go | 1 + middleware/proxy/lookup.go | 19 +++-- middleware/proxy/policy_test.go | 3 +- middleware/proxy/proxy.go | 16 +++- middleware/proxy/proxy_test.go | 4 +- middleware/proxy/upstream.go | 137 +++++++++++++++++++++--------- middleware/proxy/upstream_test.go | 14 ++- 9 files changed, 158 insertions(+), 60 deletions(-) diff --git a/middleware/proxy/README.md b/middleware/proxy/README.md index 331dccd4b..532a9cace 100644 --- a/middleware/proxy/README.md +++ b/middleware/proxy/README.md @@ -41,9 +41,9 @@ proxy FROM TO... { * `max_fails` is the number of failures within fail_timeout that are needed before considering a backend to be down. If 0, the backend will never be marked as down. Default is 1. * `health_check` will check path (on port) on each backend. If a backend returns a status code of - 200-399, then that backend is healthy. If it doesn't, the backend is marked as unhealthy for - duration and no requests are routed to it. If this option is not provided then health checks are - disabled. The default duration is 30 seconds ("30s"). + 200-399, then that backend is marked healthy for double the healthcheck duration. If it doesn't, + it is marked as unhealthy and no requests are routed to it. If this option is not provided then + health checks are disabled. The default duration is 30 seconds ("30s"). * **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from proxying. Requests that match none of these names will be passed through. * `spray` when all backends are unhealthy, randomly pick one to send the traffic to. (This is diff --git a/middleware/proxy/google.go b/middleware/proxy/google.go index dc83755ad..f021bb2b3 100644 --- a/middleware/proxy/google.go +++ b/middleware/proxy/google.go @@ -206,6 +206,7 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream { Spray: nil, FailTimeout: 10 * time.Second, MaxFails: 3, + Future: 60 * time.Second, ex: old.ex, WithoutPathPrefix: old.WithoutPathPrefix, IgnoredSubDomains: old.IgnoredSubDomains, @@ -218,23 +219,30 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream { Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - Unhealthy: false, CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { return func(uh *UpstreamHost) bool { - if uh.Unhealthy { - return true + + down := false + + uh.checkMu.Lock() + until := uh.OkUntil + uh.checkMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true } fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - return true + down = true } - return false + return down } }(upstream), WithoutPathPrefix: upstream.WithoutPathPrefix, } + upstream.Hosts[i] = uh } return upstream diff --git a/middleware/proxy/grpc_test.go b/middleware/proxy/grpc_test.go index 1a0d406ec..e303e1594 100644 --- a/middleware/proxy/grpc_test.go +++ b/middleware/proxy/grpc_test.go @@ -27,6 +27,7 @@ func TestStartupShutdown(t *testing.T) { Policy: &Random{}, Spray: nil, FailTimeout: 10 * time.Second, + Future: 60 * time.Second, MaxFails: 1, } g := newGrpcClient(nil, upstream) diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go index e871774cf..fa6d1ddf2 100644 --- a/middleware/proxy/lookup.go +++ b/middleware/proxy/lookup.go @@ -30,6 +30,7 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { Spray: nil, FailTimeout: 10 * time.Second, MaxFails: 3, // TODO(miek): disable error checking for simple lookups? + Future: 60 * time.Second, ex: newDNSExWithOption(opts), } @@ -40,21 +41,29 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { Fails: 0, FailTimeout: upstream.FailTimeout, - Unhealthy: false, CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { return func(uh *UpstreamHost) bool { - if uh.Unhealthy { - return true + + down := false + + uh.checkMu.Lock() + until := uh.OkUntil + uh.checkMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true } + fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - return true + down = true } - return false + return down } }(upstream), WithoutPathPrefix: upstream.WithoutPathPrefix, } + upstream.Hosts[i] = uh } p.Upstreams = &[]Upstream{upstream} diff --git a/middleware/proxy/policy_test.go b/middleware/proxy/policy_test.go index 8f4f1f792..24fd3efdc 100644 --- a/middleware/proxy/policy_test.go +++ b/middleware/proxy/policy_test.go @@ -5,6 +5,7 @@ import ( "net/http/httptest" "os" "testing" + "time" ) var workableServer *httptest.Server @@ -54,7 +55,7 @@ func TestRoundRobinPolicy(t *testing.T) { t.Error("Expected second round robin host to be third host in the pool.") } // mark host as down - pool[0].Unhealthy = true + pool[0].OkUntil = time.Unix(0, 0) h = rrPolicy.Select(pool) if h != pool[1] { t.Error("Expected third round robin host to be first host in the pool.") diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go index 5205bd06f..b3156e818 100644 --- a/middleware/proxy/proxy.go +++ b/middleware/proxy/proxy.go @@ -59,9 +59,11 @@ type UpstreamHost struct { Name string // IP address (and port) of this upstream host Fails int32 FailTimeout time.Duration - Unhealthy bool + OkUntil time.Time CheckDown UpstreamHostDownFunc + CheckUrl string WithoutPathPrefix string + Checking bool checkMu sync.Mutex } @@ -72,7 +74,17 @@ func (uh *UpstreamHost) Down() bool { if uh.CheckDown == nil { // Default settings fails := atomic.LoadInt32(&uh.Fails) - return uh.Unhealthy || fails > 0 + after := false + + uh.checkMu.Lock() + until := uh.OkUntil + uh.checkMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + after = true + } + + return after || fails > 0 } return uh.CheckDown(uh) } diff --git a/middleware/proxy/proxy_test.go b/middleware/proxy/proxy_test.go index 0a44d1b82..b0cb9c3cb 100644 --- a/middleware/proxy/proxy_test.go +++ b/middleware/proxy/proxy_test.go @@ -74,8 +74,10 @@ func TestStop(t *testing.T) { t.Error("Expected healthchecks to hit test server. Got no healthchecks.") } + // health checks are in a go routine now, so one may well occur after we shutdown, + // but we only ever expect one more counterValueAfterWaiting := atomic.LoadInt64(&counter) - if counterValueAfterWaiting != counterValueAfterShutdown { + if counterValueAfterWaiting > (counterValueAfterShutdown + 1) { t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown) } diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 76dd70348..b20165eeb 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -36,6 +36,7 @@ type staticUpstream struct { FailTimeout time.Duration MaxFails int32 + Future time.Duration HealthCheck struct { Path string Port string @@ -59,6 +60,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { Spray: nil, FailTimeout: 10 * time.Second, MaxFails: 1, + Future: 60 * time.Second, ex: newDNSEx(), } @@ -89,21 +91,25 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - Unhealthy: false, CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { return func(uh *UpstreamHost) bool { + + down := false + uh.checkMu.Lock() - defer uh.checkMu.Unlock() - if uh.Unhealthy { - return true + until := uh.OkUntil + uh.checkMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true } fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - return true + down = true } - return false + return down } }(upstream), WithoutPathPrefix: upstream.WithoutPathPrefix, @@ -186,6 +192,12 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return err } u.HealthCheck.Interval = dur + u.Future = 2 * dur + + // set a minimum of 3 seconds + if u.Future < (3 * time.Second) { + u.Future = 3 * time.Second + } } case "without": if !c.NextArg() { @@ -247,46 +259,93 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return nil } +// This was moved into a thread so that each host could throw a health +// check at the same time. The reason for this is that if we are checking +// 3 hosts, and the first one is gone, and we spend minutes timing out to +// fail it, we would not have been doing any other health checks in that +// time. So we now have a per-host lock and a threaded health check. +// +// We use the Checking bool to avoid concurrent checks against the same +// host; if one is taking a long time, the next one will find a check in +// progress and simply return before trying. +// +// We are carefully avoiding having the mutex locked while we check, +// otherwise checks will back up, potentially a lot of them if a host is +// absent for a long time. This arrangement makes checks quickly see if +// they are the only one running and abort otherwise. +func healthCheckUrl(nextTs time.Time, host *UpstreamHost) { + + // lock for our bool check. We don't just defer the unlock because + // we don't want the lock held while http.Get runs + host.checkMu.Lock() + + // are we mid check? Don't run another one + if host.Checking { + host.checkMu.Unlock() + return + } + + host.Checking = true + host.checkMu.Unlock() + + //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) + + // fetch that url. This has been moved into a go func because + // when the remote host is not merely not serving, but actually + // absent, then tcp syn timeouts can be very long, and so one + // fetch could last several check intervals + if r, err := http.Get(host.CheckUrl); err == nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + + if r.StatusCode < 200 || r.StatusCode >= 400 { + log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", + host.Name, r.StatusCode) + nextTs = time.Unix(0, 0) + } + } else { + log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) + nextTs = time.Unix(0, 0) + } + + host.checkMu.Lock() + host.Checking = false + host.OkUntil = nextTs + host.checkMu.Unlock() +} + func (u *staticUpstream) healthCheck() { for _, host := range u.Hosts { - var hostName, checkPort string - // The DNS server might be an HTTP server. If so, extract its name. - if url, err := url.Parse(host.Name); err == nil { - hostName = url.Host - } else { - hostName = host.Name - } + if host.CheckUrl == "" { + var hostName, checkPort string - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.HealthCheck.Port != "" { - checkPort = u.HealthCheck.Port - } - - hostURL := "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path - - host.checkMu.Lock() - defer host.checkMu.Unlock() - - if r, err := http.Get(hostURL); err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - if r.StatusCode < 200 || r.StatusCode >= 400 { - log.Printf("[WARNING] Health check URL %s returned HTTP code %d\n", - hostURL, r.StatusCode) - host.Unhealthy = true + // The DNS server might be an HTTP server. If so, extract its name. + ret, err := url.Parse(host.Name) + if err == nil && len(ret.Host) > 0 { + hostName = ret.Host } else { - host.Unhealthy = false + hostName = host.Name } - } else { - log.Printf("[WARNING] Health check probe failed: %v\n", err) - host.Unhealthy = true + + // Extract the port number from the parsed server name. + checkHostName, checkPort, err := net.SplitHostPort(hostName) + if err != nil { + checkHostName = hostName + } + + if u.HealthCheck.Port != "" { + checkPort = u.HealthCheck.Port + } + + host.CheckUrl = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path } + + // calculate this before the get + nextTs := time.Now().Add(u.Future) + + // locks/bools should prevent requests backing up + go healthCheckUrl(nextTs, host) } } diff --git a/middleware/proxy/upstream_test.go b/middleware/proxy/upstream_test.go index 587d96994..06c229f39 100644 --- a/middleware/proxy/upstream_test.go +++ b/middleware/proxy/upstream_test.go @@ -23,9 +23,14 @@ func TestHealthCheck(t *testing.T) { Policy: &Random{}, Spray: nil, FailTimeout: 10 * time.Second, + Future: 60 * time.Second, MaxFails: 1, } + upstream.healthCheck() + // sleep a bit, it's async now + time.Sleep(time.Duration(2 * time.Second)) + if upstream.Hosts[0].Down() { t.Error("Expected first host in testpool to not fail healthcheck.") } @@ -40,15 +45,16 @@ func TestSelect(t *testing.T) { Hosts: testPool()[:3], Policy: &Random{}, FailTimeout: 10 * time.Second, + Future: 60 * time.Second, MaxFails: 1, } - upstream.Hosts[0].Unhealthy = true - upstream.Hosts[1].Unhealthy = true - upstream.Hosts[2].Unhealthy = true + upstream.Hosts[0].OkUntil = time.Unix(0, 0) + upstream.Hosts[1].OkUntil = time.Unix(0, 0) + upstream.Hosts[2].OkUntil = time.Unix(0, 0) if h := upstream.Select(); h != nil { t.Error("Expected select to return nil as all host are down") } - upstream.Hosts[2].Unhealthy = false + upstream.Hosts[2].OkUntil = time.Time{} if h := upstream.Select(); h == nil { t.Error("Expected select to not return nil") }