From a1b175ef78783df180dffe50d756a85c66cfb1b7 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Wed, 9 Aug 2017 09:21:33 -0700 Subject: [PATCH] Move Healthcheck to middleware/pkg/healthcheck (#854) * Move healthcheck out Signed-off-by: Yong Tang * Move healthcheck to middleware/pkg/healthcheck Signed-off-by: Yong Tang --- middleware/pkg/healthcheck/healthcheck.go | 236 ++++++++++++++++++ .../{proxy => pkg/healthcheck}/policy.go | 12 +- .../{proxy => pkg/healthcheck}/policy_test.go | 55 +++- middleware/proxy/google.go | 26 +- middleware/proxy/grpc_test.go | 20 +- middleware/proxy/lookup.go | 29 +-- middleware/proxy/proxy.go | 45 +--- middleware/proxy/upstream.go | 226 ++--------------- middleware/proxy/upstream_test.go | 58 ----- 9 files changed, 359 insertions(+), 348 deletions(-) create mode 100644 middleware/pkg/healthcheck/healthcheck.go rename middleware/{proxy => pkg/healthcheck}/policy.go (93%) rename middleware/{proxy => pkg/healthcheck}/policy_test.go (59%) diff --git a/middleware/pkg/healthcheck/healthcheck.go b/middleware/pkg/healthcheck/healthcheck.go new file mode 100644 index 000000000..e0152a47b --- /dev/null +++ b/middleware/pkg/healthcheck/healthcheck.go @@ -0,0 +1,236 @@ +package healthcheck + +import ( + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" +) + +// UpstreamHostDownFunc can be used to customize how Down behaves. +type UpstreamHostDownFunc func(*UpstreamHost) bool + +// UpstreamHost represents a single proxy upstream +type UpstreamHost struct { + Conns int64 // must be first field to be 64-bit aligned on 32-bit systems + Name string // IP address (and port) of this upstream host + Fails int32 + FailTimeout time.Duration + OkUntil time.Time + CheckDown UpstreamHostDownFunc + CheckURL string + WithoutPathPrefix string + Checking bool + CheckMu sync.Mutex +} + +// Down checks whether the upstream host is down or not. +// Down will try to use uh.CheckDown first, and will fall +// back to some default criteria if necessary. +func (uh *UpstreamHost) Down() bool { + if uh.CheckDown == nil { + // Default settings + fails := atomic.LoadInt32(&uh.Fails) + after := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + after = true + } + + return after || fails > 0 + } + return uh.CheckDown(uh) +} + +// HostPool is a collection of UpstreamHosts. +type HostPool []*UpstreamHost + +type HealthCheck struct { + wg sync.WaitGroup // Used to wait for running goroutines to stop. + stop chan struct{} // Signals running goroutines to stop. + Hosts HostPool + Policy Policy + Spray Policy + FailTimeout time.Duration + MaxFails int32 + Future time.Duration + Path string + Port string + Interval time.Duration +} + +func (u *HealthCheck) Start() { + u.stop = make(chan struct{}) + if u.Path != "" { + u.wg.Add(1) + go func() { + defer u.wg.Done() + u.HealthCheckWorker(u.stop) + }() + } +} + +// Stop sends a signal to all goroutines started by this staticUpstream to exit +// and waits for them to finish before returning. +func (u *HealthCheck) Stop() error { + close(u.stop) + u.wg.Wait() + return nil +} + +// This was moved into a thread so that each host could throw a health +// check at the same time. The reason for this is that if we are checking +// 3 hosts, and the first one is gone, and we spend minutes timing out to +// fail it, we would not have been doing any other health checks in that +// time. So we now have a per-host lock and a threaded health check. +// +// We use the Checking bool to avoid concurrent checks against the same +// host; if one is taking a long time, the next one will find a check in +// progress and simply return before trying. +// +// We are carefully avoiding having the mutex locked while we check, +// otherwise checks will back up, potentially a lot of them if a host is +// absent for a long time. This arrangement makes checks quickly see if +// they are the only one running and abort otherwise. +func healthCheckURL(nextTs time.Time, host *UpstreamHost) { + + // lock for our bool check. We don't just defer the unlock because + // we don't want the lock held while http.Get runs + host.CheckMu.Lock() + + // are we mid check? Don't run another one + if host.Checking { + host.CheckMu.Unlock() + return + } + + host.Checking = true + host.CheckMu.Unlock() + + //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) + + // fetch that url. This has been moved into a go func because + // when the remote host is not merely not serving, but actually + // absent, then tcp syn timeouts can be very long, and so one + // fetch could last several check intervals + if r, err := http.Get(host.CheckURL); err == nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + + if r.StatusCode < 200 || r.StatusCode >= 400 { + log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", + host.Name, r.StatusCode) + nextTs = time.Unix(0, 0) + } + } else { + log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) + nextTs = time.Unix(0, 0) + } + + host.CheckMu.Lock() + host.Checking = false + host.OkUntil = nextTs + host.CheckMu.Unlock() +} + +func (u *HealthCheck) healthCheck() { + for _, host := range u.Hosts { + + if host.CheckURL == "" { + var hostName, checkPort string + + // The DNS server might be an HTTP server. If so, extract its name. + ret, err := url.Parse(host.Name) + if err == nil && len(ret.Host) > 0 { + hostName = ret.Host + } else { + hostName = host.Name + } + + // Extract the port number from the parsed server name. + checkHostName, checkPort, err := net.SplitHostPort(hostName) + if err != nil { + checkHostName = hostName + } + + if u.Port != "" { + checkPort = u.Port + } + + host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path + } + + // calculate this before the get + nextTs := time.Now().Add(u.Future) + + // locks/bools should prevent requests backing up + go healthCheckURL(nextTs, host) + } +} + +func (u *HealthCheck) HealthCheckWorker(stop chan struct{}) { + ticker := time.NewTicker(u.Interval) + u.healthCheck() + for { + select { + case <-ticker.C: + u.healthCheck() + case <-stop: + ticker.Stop() + return + } + } +} + +func (u *HealthCheck) Select() *UpstreamHost { + pool := u.Hosts + if len(pool) == 1 { + if pool[0].Down() && u.Spray == nil { + return nil + } + return pool[0] + } + allDown := true + for _, host := range pool { + if !host.Down() { + allDown = false + break + } + } + if allDown { + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + if u.Policy == nil { + h := (&Random{}).Select(pool) + if h != nil { + return h + } + if h == nil && u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + h := u.Policy.Select(pool) + if h != nil { + return h + } + + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) +} diff --git a/middleware/proxy/policy.go b/middleware/pkg/healthcheck/policy.go similarity index 93% rename from middleware/proxy/policy.go rename to middleware/pkg/healthcheck/policy.go index e0c9d7e2b..0cef8d79a 100644 --- a/middleware/proxy/policy.go +++ b/middleware/pkg/healthcheck/policy.go @@ -1,4 +1,4 @@ -package proxy +package healthcheck import ( "log" @@ -6,8 +6,14 @@ import ( "sync/atomic" ) -// HostPool is a collection of UpstreamHosts. -type HostPool []*UpstreamHost +var ( + SupportedPolicies = make(map[string]func() Policy) +) + +// RegisterPolicy adds a custom policy to the proxy. +func RegisterPolicy(name string, policy func() Policy) { + SupportedPolicies[name] = policy +} // Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the // healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent diff --git a/middleware/proxy/policy_test.go b/middleware/pkg/healthcheck/policy_test.go similarity index 59% rename from middleware/proxy/policy_test.go rename to middleware/pkg/healthcheck/policy_test.go index 24fd3efdc..16cae7266 100644 --- a/middleware/proxy/policy_test.go +++ b/middleware/pkg/healthcheck/policy_test.go @@ -1,6 +1,8 @@ -package proxy +package healthcheck import ( + "io/ioutil" + "log" "net/http" "net/http/httptest" "os" @@ -41,6 +43,57 @@ func testPool() HostPool { return HostPool(pool) } +func TestRegisterPolicy(t *testing.T) { + name := "custom" + customPolicy := &customPolicy{} + RegisterPolicy(name, func() Policy { return customPolicy }) + if _, ok := SupportedPolicies[name]; !ok { + t.Error("Expected supportedPolicies to have a custom policy.") + } + +} + +func TestHealthCheck(t *testing.T) { + log.SetOutput(ioutil.Discard) + + u := &HealthCheck{ + Hosts: testPool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + + u.healthCheck() + // sleep a bit, it's async now + time.Sleep(time.Duration(2 * time.Second)) + + if u.Hosts[0].Down() { + t.Error("Expected first host in testpool to not fail healthcheck.") + } + if !u.Hosts[1].Down() { + t.Error("Expected second host in testpool to fail healthcheck.") + } +} + +func TestSelect(t *testing.T) { + u := &HealthCheck{ + Hosts: testPool()[:3], + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + u.Hosts[0].OkUntil = time.Unix(0, 0) + u.Hosts[1].OkUntil = time.Unix(0, 0) + u.Hosts[2].OkUntil = time.Unix(0, 0) + if h := u.Select(); h != nil { + t.Error("Expected select to return nil as all host are down") + } + u.Hosts[2].OkUntil = time.Time{} + if h := u.Select(); h == nil { + t.Error("Expected select to not return nil") + } +} + func TestRoundRobinPolicy(t *testing.T) { pool := testPool() rrPolicy := &RoundRobin{} diff --git a/middleware/proxy/google.go b/middleware/proxy/google.go index f021bb2b3..b71d0fb1b 100644 --- a/middleware/proxy/google.go +++ b/middleware/proxy/google.go @@ -14,6 +14,7 @@ import ( "time" "github.com/coredns/coredns/middleware/pkg/debug" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -200,34 +201,33 @@ func extractAnswer(m *dns.Msg) ([]string, error) { // newUpstream returns an upstream initialized with hosts. func newUpstream(hosts []string, old *staticUpstream) Upstream { upstream := &staticUpstream{ - from: old.from, - Hosts: nil, - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 3, - Future: 60 * time.Second, + from: old.from, + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, + Future: 60 * time.Second, + }, ex: old.ex, WithoutPathPrefix: old.WithoutPathPrefix, IgnoredSubDomains: old.IgnoredSubDomains, } - upstream.Hosts = make([]*UpstreamHost, len(hosts)) + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) for i, h := range hosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: h, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true diff --git a/middleware/proxy/grpc_test.go b/middleware/proxy/grpc_test.go index e303e1594..dcde7cc0e 100644 --- a/middleware/proxy/grpc_test.go +++ b/middleware/proxy/grpc_test.go @@ -4,11 +4,13 @@ import ( "testing" "time" + "github.com/coredns/coredns/middleware/pkg/healthcheck" + "google.golang.org/grpc/grpclog" ) -func pool() []*UpstreamHost { - return []*UpstreamHost{ +func pool() []*healthcheck.UpstreamHost { + return []*healthcheck.UpstreamHost{ { Name: "localhost:10053", }, @@ -22,13 +24,13 @@ func TestStartupShutdown(t *testing.T) { grpclog.SetLogger(discard{}) upstream := &staticUpstream{ - from: ".", - Hosts: pool(), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, + from: ".", + HealthCheck: healthcheck.HealthCheck{ + Hosts: pool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + }, } g := newGrpcClient(nil, upstream) upstream.ex = g diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go index a6c714f39..eda0d0a0c 100644 --- a/middleware/proxy/lookup.go +++ b/middleware/proxy/lookup.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -24,31 +25,31 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost // we should copy/make something similar. upstream := &staticUpstream{ - from: ".", - Hosts: make([]*UpstreamHost, len(hosts)), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 3, // TODO(miek): disable error checking for simple lookups? - Future: 60 * time.Second, - ex: newDNSExWithOption(opts), + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, // TODO(miek): disable error checking for simple lookups? + Future: 60 * time.Second, + }, + ex: newDNSExWithOption(opts), } + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) for i, host := range hosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true @@ -120,7 +121,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) { timeout = 10 * time.Second } atomic.AddInt32(&host.Fails, 1) - go func(host *UpstreamHost, timeout time.Duration) { + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) }(host, timeout) diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go index 8780330ed..7e662c42e 100644 --- a/middleware/proxy/proxy.go +++ b/middleware/proxy/proxy.go @@ -3,11 +3,11 @@ package proxy import ( "errors" - "sync" "sync/atomic" "time" "github.com/coredns/coredns/middleware" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -41,7 +41,7 @@ type Upstream interface { // The domain name this upstream host should be routed on. From() string // Selects an upstream host to be routed to. - Select() *UpstreamHost + Select() *healthcheck.UpstreamHost // Checks if subpdomain is not an ignored. IsAllowedDomain(string) bool // Exchanger returns the exchanger to be used for this upstream. @@ -50,45 +50,6 @@ type Upstream interface { Stop() error } -// UpstreamHostDownFunc can be used to customize how Down behaves. -type UpstreamHostDownFunc func(*UpstreamHost) bool - -// UpstreamHost represents a single proxy upstream -type UpstreamHost struct { - Conns int64 // must be first field to be 64-bit aligned on 32-bit systems - Name string // IP address (and port) of this upstream host - Fails int32 - FailTimeout time.Duration - OkUntil time.Time - CheckDown UpstreamHostDownFunc - CheckURL string - WithoutPathPrefix string - Checking bool - checkMu sync.Mutex -} - -// Down checks whether the upstream host is down or not. -// Down will try to use uh.CheckDown first, and will fall -// back to some default criteria if necessary. -func (uh *UpstreamHost) Down() bool { - if uh.CheckDown == nil { - // Default settings - fails := atomic.LoadInt32(&uh.Fails) - after := false - - uh.checkMu.Lock() - until := uh.OkUntil - uh.checkMu.Unlock() - - if !until.IsZero() && time.Now().After(until) { - after = true - } - - return after || fails > 0 - } - return uh.CheckDown(uh) -} - // tryDuration is how long to try upstream hosts; failures result in // immediate retries until this duration ends or we get a nil host. var tryDuration = 60 * time.Second @@ -145,7 +106,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) ( timeout = 10 * time.Second } atomic.AddInt32(&host.Fails, 1) - go func(host *UpstreamHost, timeout time.Duration) { + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) }(host, timeout) diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 380b585be..93ef0e32d 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -2,46 +2,25 @@ package proxy import ( "fmt" - "io" - "io/ioutil" - "log" "net" - "net/http" - "net/url" "strconv" "strings" - "sync" "sync/atomic" "time" "github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware/pkg/dnsutil" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/middleware/pkg/tls" "github.com/mholt/caddy/caddyfile" "github.com/miekg/dns" ) -var ( - supportedPolicies = make(map[string]func() Policy) -) - type staticUpstream struct { from string - stop chan struct{} // Signals running goroutines to stop. - wg sync.WaitGroup // Used to wait for running goroutines to stop. - Hosts HostPool - Policy Policy - Spray Policy + healthcheck.HealthCheck - FailTimeout time.Duration - MaxFails int32 - Future time.Duration - HealthCheck struct { - Path string - Port string - Interval time.Duration - } WithoutPathPrefix string IgnoredSubDomains []string ex Exchanger @@ -53,15 +32,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { var upstreams []Upstream for c.Next() { upstream := &staticUpstream{ - from: ".", - stop: make(chan struct{}), - Hosts: nil, - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 1, - Future: 60 * time.Second, - ex: newDNSEx(), + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 1, + Future: 60 * time.Second, + }, + ex: newDNSEx(), } if !c.Args(&upstream.from) { @@ -84,22 +61,22 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } } - upstream.Hosts = make([]*UpstreamHost, len(toHosts)) + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) for i, host := range toHosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true @@ -117,32 +94,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { upstream.Hosts[i] = uh } + upstream.Start() - if upstream.HealthCheck.Path != "" { - upstream.wg.Add(1) - go func() { - defer upstream.wg.Done() - upstream.HealthCheckWorker(upstream.stop) - }() - } upstreams = append(upstreams, upstream) } return upstreams, nil } -// Stop sends a signal to all goroutines started by this staticUpstream to exit -// and waits for them to finish before returning. -func (u *staticUpstream) Stop() error { - close(u.stop) - u.wg.Wait() - return nil -} - -// RegisterPolicy adds a custom policy to the proxy. -func RegisterPolicy(name string, policy func() Policy) { - supportedPolicies[name] = policy -} - func (u *staticUpstream) From() string { return u.from } @@ -153,7 +111,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { if !c.NextArg() { return c.ArgErr() } - policyCreateFunc, ok := supportedPolicies[c.Val()] + policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()] if !ok { return c.ArgErr() } @@ -214,7 +172,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { } u.IgnoredSubDomains = ignoredDomains case "spray": - u.Spray = &Spray{} + u.Spray = &healthcheck.Spray{} case "protocol": encArgs := c.RemainingArgs() if len(encArgs) == 0 { @@ -259,154 +217,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return nil } -// This was moved into a thread so that each host could throw a health -// check at the same time. The reason for this is that if we are checking -// 3 hosts, and the first one is gone, and we spend minutes timing out to -// fail it, we would not have been doing any other health checks in that -// time. So we now have a per-host lock and a threaded health check. -// -// We use the Checking bool to avoid concurrent checks against the same -// host; if one is taking a long time, the next one will find a check in -// progress and simply return before trying. -// -// We are carefully avoiding having the mutex locked while we check, -// otherwise checks will back up, potentially a lot of them if a host is -// absent for a long time. This arrangement makes checks quickly see if -// they are the only one running and abort otherwise. -func healthCheckURL(nextTs time.Time, host *UpstreamHost) { - - // lock for our bool check. We don't just defer the unlock because - // we don't want the lock held while http.Get runs - host.checkMu.Lock() - - // are we mid check? Don't run another one - if host.Checking { - host.checkMu.Unlock() - return - } - - host.Checking = true - host.checkMu.Unlock() - - //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) - - // fetch that url. This has been moved into a go func because - // when the remote host is not merely not serving, but actually - // absent, then tcp syn timeouts can be very long, and so one - // fetch could last several check intervals - if r, err := http.Get(host.CheckURL); err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - - if r.StatusCode < 200 || r.StatusCode >= 400 { - log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", - host.Name, r.StatusCode) - nextTs = time.Unix(0, 0) - } - } else { - log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) - nextTs = time.Unix(0, 0) - } - - host.checkMu.Lock() - host.Checking = false - host.OkUntil = nextTs - host.checkMu.Unlock() -} - -func (u *staticUpstream) healthCheck() { - for _, host := range u.Hosts { - - if host.CheckURL == "" { - var hostName, checkPort string - - // The DNS server might be an HTTP server. If so, extract its name. - ret, err := url.Parse(host.Name) - if err == nil && len(ret.Host) > 0 { - hostName = ret.Host - } else { - hostName = host.Name - } - - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.HealthCheck.Port != "" { - checkPort = u.HealthCheck.Port - } - - host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path - } - - // calculate this before the get - nextTs := time.Now().Add(u.Future) - - // locks/bools should prevent requests backing up - go healthCheckURL(nextTs, host) - } -} - -func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { - ticker := time.NewTicker(u.HealthCheck.Interval) - u.healthCheck() - for { - select { - case <-ticker.C: - u.healthCheck() - case <-stop: - ticker.Stop() - return - } - } -} - -func (u *staticUpstream) Select() *UpstreamHost { - pool := u.Hosts - if len(pool) == 1 { - if pool[0].Down() && u.Spray == nil { - return nil - } - return pool[0] - } - allDown := true - for _, host := range pool { - if !host.Down() { - allDown = false - break - } - } - if allDown { - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - if u.Policy == nil { - h := (&Random{}).Select(pool) - if h != nil { - return h - } - if h == nil && u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - h := u.Policy.Select(pool) - if h != nil { - return h - } - - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) -} - func (u *staticUpstream) IsAllowedDomain(name string) bool { if dns.Name(name) == dns.Name(u.From()) { return true diff --git a/middleware/proxy/upstream_test.go b/middleware/proxy/upstream_test.go index 06c229f39..3aa4104e8 100644 --- a/middleware/proxy/upstream_test.go +++ b/middleware/proxy/upstream_test.go @@ -2,74 +2,16 @@ package proxy import ( "io/ioutil" - "log" "os" "path/filepath" "strings" "testing" - "time" "github.com/coredns/coredns/middleware/test" "github.com/mholt/caddy" ) -func TestHealthCheck(t *testing.T) { - log.SetOutput(ioutil.Discard) - - upstream := &staticUpstream{ - from: ".", - Hosts: testPool(), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, - } - - upstream.healthCheck() - // sleep a bit, it's async now - time.Sleep(time.Duration(2 * time.Second)) - - if upstream.Hosts[0].Down() { - t.Error("Expected first host in testpool to not fail healthcheck.") - } - if !upstream.Hosts[1].Down() { - t.Error("Expected second host in testpool to fail healthcheck.") - } -} - -func TestSelect(t *testing.T) { - upstream := &staticUpstream{ - from: ".", - Hosts: testPool()[:3], - Policy: &Random{}, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, - } - upstream.Hosts[0].OkUntil = time.Unix(0, 0) - upstream.Hosts[1].OkUntil = time.Unix(0, 0) - upstream.Hosts[2].OkUntil = time.Unix(0, 0) - if h := upstream.Select(); h != nil { - t.Error("Expected select to return nil as all host are down") - } - upstream.Hosts[2].OkUntil = time.Time{} - if h := upstream.Select(); h == nil { - t.Error("Expected select to not return nil") - } -} - -func TestRegisterPolicy(t *testing.T) { - name := "custom" - customPolicy := &customPolicy{} - RegisterPolicy(name, func() Policy { return customPolicy }) - if _, ok := supportedPolicies[name]; !ok { - t.Error("Expected supportedPolicies to have a custom policy.") - } - -} - func TestAllowedDomain(t *testing.T) { upstream := &staticUpstream{ from: "miek.nl.",