diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 211d2573c..d8173769c 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -175,7 +175,6 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 3 * time.Second, MaxFails: 1, - Future: 10 * time.Second, Path: "/", Interval: 5 * time.Second, }, @@ -190,21 +189,11 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc { return func(uh *healthcheck.UpstreamHost) bool { - down := false - - uh.Lock() - until := uh.OkUntil - uh.Unlock() - - if !until.IsZero() && time.Now().After(until) { - down = true - } - fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - down = true + return true } - return down + return false } }(&k.APIProxy.handler), } diff --git a/plugin/pkg/healthcheck/healthcheck.go b/plugin/pkg/healthcheck/healthcheck.go index 4733a5dea..870373fdd 100644 --- a/plugin/pkg/healthcheck/healthcheck.go +++ b/plugin/pkg/healthcheck/healthcheck.go @@ -21,7 +21,6 @@ type UpstreamHost struct { Name string // IP address (and port) of this upstream host Fails int32 FailTimeout time.Duration - OkUntil time.Time CheckDown UpstreamHostDownFunc CheckURL string Checking bool @@ -33,19 +32,8 @@ type UpstreamHost struct { // back to some default criteria if necessary. func (uh *UpstreamHost) Down() bool { if uh.CheckDown == nil { - // Default settings fails := atomic.LoadInt32(&uh.Fails) - after := false - - uh.Lock() - until := uh.OkUntil - uh.Unlock() - - if !until.IsZero() && time.Now().After(until) { - after = true - } - - return after || fails > 0 + return fails > 0 } return uh.CheckDown(uh) } @@ -64,7 +52,6 @@ type HealthCheck struct { Spray Policy FailTimeout time.Duration MaxFails int32 - Future time.Duration Path string Port string Interval time.Duration @@ -72,6 +59,10 @@ type HealthCheck struct { // Start starts the healthcheck func (u *HealthCheck) Start() { + for i, h := range u.Hosts { + u.Hosts[i].CheckURL = u.normalizeCheckURL(h.Name) + } + u.stop = make(chan struct{}) if u.Path != "" { u.wg.Add(1) @@ -104,14 +95,16 @@ func (u *HealthCheck) Stop() error { // otherwise checks will back up, potentially a lot of them if a host is // absent for a long time. This arrangement makes checks quickly see if // they are the only one running and abort otherwise. -func (uh *UpstreamHost) healthCheckURL(nextTs time.Time) { - // lock for our bool check. We don't just defer the unlock because - // we don't want the lock held while http.Get runs +// HealthCheckURL performs the http.Get that implements healthcheck. +func (uh *UpstreamHost) HealthCheckURL() { + // Lock for our bool check. We don't just defer the unlock because + // we don't want the lock held while http.Get runs. uh.Lock() - // are we mid check? Don't run another one - if uh.Checking { + // We call HealthCheckURL from proxy.go and lookup.go, bail out when nothing + // is configured to healthcheck. Or we mid check? Don't run another one. + if uh.CheckURL == "" || uh.Checking { // nothing configured uh.Unlock() return } @@ -119,64 +112,39 @@ func (uh *UpstreamHost) healthCheckURL(nextTs time.Time) { uh.Checking = true uh.Unlock() - // fetch that url. This has been moved into a go func because - // when the remote host is not merely not serving, but actually - // absent, then tcp syn timeouts can be very long, and so one - // fetch could last several check intervals - if r, err := http.Get(uh.CheckURL); err == nil { + // default timeout (5s) + r, err := healthClient.Get(uh.CheckURL) + + defer func() { + uh.Lock() + uh.Checking = false + uh.Unlock() + }() + + if err != nil { + log.Printf("[WARNING] Host %s health check probe failed: %v", uh.Name, err) + return + } + + if err == nil { io.Copy(ioutil.Discard, r.Body) r.Body.Close() if r.StatusCode < 200 || r.StatusCode >= 400 { log.Printf("[WARNING] Host %s health check returned HTTP code %d", uh.Name, r.StatusCode) - nextTs = time.Unix(0, 0) - } else { - // We are healthy again, reset fails - atomic.StoreInt32(&uh.Fails, 0) + return } - } else { - log.Printf("[WARNING] Host %s health check probe failed: %v", uh.Name, err) - nextTs = time.Unix(0, 0) - } - uh.Lock() - uh.Checking = false - uh.OkUntil = nextTs - uh.Unlock() + // We are healthy again, reset fails. + atomic.StoreInt32(&uh.Fails, 0) + return + } } func (u *HealthCheck) healthCheck() { for _, host := range u.Hosts { - - if host.CheckURL == "" { - var hostName, checkPort string - - // The DNS server might be an HTTP server. If so, extract its name. - ret, err := url.Parse(host.Name) - if err == nil && len(ret.Host) > 0 { - hostName = ret.Host - } else { - hostName = host.Name - } - - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.Port != "" { - checkPort = u.Port - } - - host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path - } - - // calculate next timestamp before the get - nextTs := time.Now().Add(u.Future) - // locks/bools should prevent requests backing up - go host.healthCheckURL(nextTs) + go host.HealthCheckURL() } } @@ -239,3 +207,28 @@ func (u *HealthCheck) Select() *UpstreamHost { } return u.Spray.Select(pool) } + +// normalizeCheckURL creates a proper URL for the health check. +func (u *HealthCheck) normalizeCheckURL(name string) string { + // The DNS server might be an HTTP server. If so, extract its name. + hostName := name + ret, err := url.Parse(name) + if err == nil && len(ret.Host) > 0 { + hostName = ret.Host + } + + // Extract the port number from the parsed server name. + checkHostName, checkPort, err := net.SplitHostPort(hostName) + if err != nil { + checkHostName = hostName + } + + if u.Port != "" { + checkPort = u.Port + } + + checkURL := "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path + return checkURL +} + +var healthClient = func() *http.Client { return &http.Client{Timeout: 5 * time.Second} }() diff --git a/plugin/pkg/healthcheck/policy_test.go b/plugin/pkg/healthcheck/policy_test.go index 4c667952c..60ed46b23 100644 --- a/plugin/pkg/healthcheck/policy_test.go +++ b/plugin/pkg/healthcheck/policy_test.go @@ -13,6 +13,8 @@ import ( var workableServer *httptest.Server func TestMain(m *testing.M) { + log.SetOutput(ioutil.Discard) + workableServer = httptest.NewServer(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { // do nothing @@ -30,15 +32,9 @@ func (r *customPolicy) Select(pool HostPool) *UpstreamHost { func testPool() HostPool { pool := []*UpstreamHost{ - { - Name: workableServer.URL, // this should resolve (healthcheck test) - }, - { - Name: "http://shouldnot.resolve", // this shouldn't - }, - { - Name: "http://C", - }, + {Name: workableServer.URL}, // this should resolve (healthcheck test) + {Name: "http://shouldnot.resolve"}, // this shouldn't + {Name: "http://C"}, } return HostPool(pool) } @@ -53,21 +49,21 @@ func TestRegisterPolicy(t *testing.T) { } -// TODO(miek): Disabled for now, we should get out of the habit of using -// realtime in these tests . -func testHealthCheck(t *testing.T) { - log.SetOutput(ioutil.Discard) - +func TestHealthCheck(t *testing.T) { u := &HealthCheck{ Hosts: testPool(), FailTimeout: 10 * time.Second, - Future: 60 * time.Second, MaxFails: 1, } + for i, h := range u.Hosts { + u.Hosts[i].Fails = 1 + u.Hosts[i].CheckURL = u.normalizeCheckURL(h.Name) + } + u.healthCheck() - // sleep a bit, it's async now - time.Sleep(time.Duration(2 * time.Second)) + + time.Sleep(time.Duration(1 * time.Second)) // sleep a bit, it's async now if u.Hosts[0].Down() { t.Error("Expected first host in testpool to not fail healthcheck.") @@ -77,25 +73,6 @@ func testHealthCheck(t *testing.T) { } } -func TestSelect(t *testing.T) { - u := &HealthCheck{ - Hosts: testPool()[:3], - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, - } - u.Hosts[0].OkUntil = time.Unix(0, 0) - u.Hosts[1].OkUntil = time.Unix(0, 0) - u.Hosts[2].OkUntil = time.Unix(0, 0) - if h := u.Select(); h != nil { - t.Error("Expected select to return nil as all host are down") - } - u.Hosts[2].OkUntil = time.Time{} - if h := u.Select(); h == nil { - t.Error("Expected select to not return nil") - } -} - func TestRoundRobinPolicy(t *testing.T) { pool := testPool() rrPolicy := &RoundRobin{} @@ -109,10 +86,8 @@ func TestRoundRobinPolicy(t *testing.T) { if h != pool[2] { t.Error("Expected second round robin host to be third host in the pool.") } - // mark host as down - pool[0].OkUntil = time.Unix(0, 0) h = rrPolicy.Select(pool) - if h != pool[1] { + if h != pool[0] { t.Error("Expected third round robin host to be first host in the pool.") } } diff --git a/plugin/proxy/README.md b/plugin/proxy/README.md index a064a3418..f8d805f15 100644 --- a/plugin/proxy/README.md +++ b/plugin/proxy/README.md @@ -38,7 +38,7 @@ proxy FROM TO... { random, least_conn, or round_robin. Default is random. * `fail_timeout` specifies how long to consider a backend as down after it has failed. While it is down, requests will not be routed to that backend. A backend is "down" if CoreDNS fails to - communicate with it. The default value is 10 seconds ("10s"). + communicate with it. The default value is 2 seconds ("2s"). * `max_fails` is the number of failures within fail_timeout that are needed before considering a backend to be down. If 0, the backend will never be marked as down. Default is 1. * `health_check` will check **PATH** (on **PORT**) on each backend. If a backend returns a status code of diff --git a/plugin/proxy/down.go b/plugin/proxy/down.go index 5dc8b678d..11f839b46 100644 --- a/plugin/proxy/down.go +++ b/plugin/proxy/down.go @@ -2,7 +2,6 @@ package proxy import ( "sync/atomic" - "time" "github.com/coredns/coredns/plugin/pkg/healthcheck" ) @@ -10,21 +9,10 @@ import ( // Default CheckDown functions for use in the proxy plugin. var checkDownFunc = func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { return func(uh *healthcheck.UpstreamHost) bool { - - down := false - - uh.Lock() - until := uh.OkUntil - uh.Unlock() - - if !until.IsZero() && time.Now().After(until) { - down = true - } - fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - down = true + return true } - return down + return false } } diff --git a/plugin/proxy/google.go b/plugin/proxy/google.go index 91d4cba26..f635fc500 100644 --- a/plugin/proxy/google.go +++ b/plugin/proxy/google.go @@ -194,22 +194,20 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: old.ex, IgnoredSubDomains: old.IgnoredSubDomains, } upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) - for i, h := range hosts { + for i, host := range hosts { uh := &healthcheck.UpstreamHost{ - Name: h, + Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } - upstream.Hosts[i] = uh } return upstream diff --git a/plugin/proxy/grpc_test.go b/plugin/proxy/grpc_test.go index 52c5737d6..c6e6e20cc 100644 --- a/plugin/proxy/grpc_test.go +++ b/plugin/proxy/grpc_test.go @@ -28,7 +28,6 @@ func TestStartupShutdown(t *testing.T) { HealthCheck: healthcheck.HealthCheck{ Hosts: pool(), FailTimeout: 10 * time.Second, - Future: 60 * time.Second, MaxFails: 1, }, } diff --git a/plugin/proxy/healthcheck_test.go b/plugin/proxy/healthcheck_test.go new file mode 100644 index 000000000..53b9446ff --- /dev/null +++ b/plugin/proxy/healthcheck_test.go @@ -0,0 +1,66 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/coredns/coredns/plugin/test" + "github.com/coredns/coredns/request" + + "github.com/mholt/caddy/caddyfile" + "github.com/miekg/dns" +) + +func init() { + log.SetOutput(ioutil.Discard) +} + +func TestUnhealthy(t *testing.T) { + // High HC interval, we want to test the HC after failed queries. + config := "proxy . %s {\n health_check /healthcheck:%s 10s \nfail_timeout 100ms\n}" + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + w.Write([]byte("OK")) + })) + defer backend.Close() + + port := backend.URL[17:] // Remove all crap up to the port + back := backend.URL[7:] // Remove http:// + + c := caddyfile.NewDispenser("testfile", strings.NewReader(fmt.Sprintf(config, back, port))) + upstreams, err := NewStaticUpstreams(&c) + if err != nil { + t.Errorf("Expected no error. Got: %s", err) + } + p := &Proxy{Upstreams: &upstreams} + m := new(dns.Msg) + m.SetQuestion("example.org.", dns.TypeA) + state := request.Request{W: &test.ResponseWriter{}, Req: m} + + // Should all fail. + for j := 0; j < failureCheck; j++ { + if _, err := p.Forward(state); err == nil { + t.Errorf("Expected error. Got: nil") + } + } + + fails := atomic.LoadInt32(&upstreams[0].(*staticUpstream).Hosts[0].Fails) + if fails != 3 { + t.Errorf("Expected %d fails, got %d", 3, fails) + } + // HC should be kicked off, and reset the counter to 0 + i := 0 + for fails != 0 { + fails = atomic.LoadInt32(&upstreams[0].(*staticUpstream).Hosts[0].Fails) + time.Sleep(100 * time.Microsecond) + i++ + } +} diff --git a/plugin/proxy/lookup.go b/plugin/proxy/lookup.go index fc0f3e01f..51b8c4690 100644 --- a/plugin/proxy/lookup.go +++ b/plugin/proxy/lookup.go @@ -29,7 +29,6 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: newDNSExWithOption(opts), } @@ -38,8 +37,6 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { for i, host := range hosts { uh := &healthcheck.UpstreamHost{ Name: host, - Conns: 0, - Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } @@ -106,14 +103,18 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) { timeout := host.FailTimeout if timeout == 0 { - timeout = 2 * time.Second + timeout = defaultFailTimeout } atomic.AddInt32(&host.Fails, 1) + fails := atomic.LoadInt32(&host.Fails) go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) + if fails%failureCheck == 0 { // Kick off healthcheck on eveyry third failure. + host.HealthCheckURL() + } }(host, timeout) } return nil, fmt.Errorf("%s: %s", errUnreachable, backendErr) diff --git a/plugin/proxy/proxy.go b/plugin/proxy/proxy.go index f0e6eadad..7b2abd89e 100644 --- a/plugin/proxy/proxy.go +++ b/plugin/proxy/proxy.go @@ -127,14 +127,19 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) ( timeout := host.FailTimeout if timeout == 0 { - timeout = 2 * time.Second + timeout = defaultFailTimeout } atomic.AddInt32(&host.Fails, 1) + fails := atomic.LoadInt32(&host.Fails) go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) + // we may go negative here, should be rectified by the HC. atomic.AddInt32(&host.Fails, -1) + if fails%failureCheck == 0 { // Kick off healthcheck on eveyry third failure. + host.HealthCheckURL() + } }(host, timeout) } @@ -167,9 +172,6 @@ func (p Proxy) match(state request.Request) (u Upstream) { // Name implements the Handler interface. func (p Proxy) Name() string { return "proxy" } -// defaultTimeout is the default networking timeout for DNS requests. -const defaultTimeout = 5 * time.Second - func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Request, reply *dns.Msg, queryEpoch, respEpoch uint64) (err error) { if tapper := dnstap.TapperFromContext(ctx); tapper != nil { // Query @@ -206,3 +208,9 @@ func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Requ } return } + +const ( + defaultFailTimeout = 2 * time.Second + defaultTimeout = 5 * time.Second + failureCheck = 3 +) diff --git a/plugin/proxy/proxy_test.go b/plugin/proxy/proxy_test.go index b0cb9c3cb..0d29c2329 100644 --- a/plugin/proxy/proxy_test.go +++ b/plugin/proxy/proxy_test.go @@ -15,29 +15,16 @@ import ( func TestStop(t *testing.T) { config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}" tests := []struct { - name string intervalInMilliseconds int numHealthcheckIntervals int }{ - { - "No Healthchecks After Stop - 5ms, 1 intervals", - 5, - 1, - }, - { - "No Healthchecks After Stop - 5ms, 2 intervals", - 5, - 2, - }, - { - "No Healthchecks After Stop - 5ms, 3 intervals", - 5, - 3, - }, + {5, 1}, + {5, 2}, + {5, 3}, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { + for i, test := range tests { + t.Run(fmt.Sprintf("Test %d", i), func(t *testing.T) { // Set up proxy. var counter int64 @@ -53,7 +40,7 @@ func TestStop(t *testing.T) { c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds))) upstreams, err := NewStaticUpstreams(&c) if err != nil { - t.Error("Expected no error. Got:", err.Error()) + t.Errorf("Test %d, expected no error. Got: %s", i, err) } // Give some time for healthchecks to hit the server. @@ -61,27 +48,25 @@ func TestStop(t *testing.T) { for _, upstream := range upstreams { if err := upstream.Stop(); err != nil { - t.Error("Expected no error stopping upstream. Got: ", err.Error()) + t.Errorf("Test %d, expected no error stopping upstream, got: %s", i, err) } } - counterValueAfterShutdown := atomic.LoadInt64(&counter) + counterAfterShutdown := atomic.LoadInt64(&counter) // Give some time to see if healthchecks are still hitting the server. time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) - if counterValueAfterShutdown == 0 { - t.Error("Expected healthchecks to hit test server. Got no healthchecks.") + if counterAfterShutdown == 0 { + t.Errorf("Test %d, Expected healthchecks to hit test server, got none", i) } // health checks are in a go routine now, so one may well occur after we shutdown, // but we only ever expect one more - counterValueAfterWaiting := atomic.LoadInt64(&counter) - if counterValueAfterWaiting > (counterValueAfterShutdown + 1) { - t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown) + counterAfterWaiting := atomic.LoadInt64(&counter) + if counterAfterWaiting > (counterAfterShutdown + 1) { + t.Errorf("Test %d, expected no more healthchecks after shutdown. got: %d healthchecks after shutdown", i, counterAfterWaiting-counterAfterShutdown) } - }) - } } diff --git a/plugin/proxy/upstream.go b/plugin/proxy/upstream.go index 0ab29de51..151fcad60 100644 --- a/plugin/proxy/upstream.go +++ b/plugin/proxy/upstream.go @@ -33,7 +33,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: newDNSEx(), } @@ -61,15 +60,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) + for i, host := range toHosts { uh := &healthcheck.UpstreamHost{ Name: host, - Conns: 0, - Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } - upstream.Hosts[i] = uh } upstream.Start() @@ -79,10 +76,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { return upstreams, nil } -func (u *staticUpstream) From() string { - return u.from -} - func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { switch c.Val() { case "policy": @@ -128,12 +121,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return err } u.HealthCheck.Interval = dur - u.Future = 2 * dur - - // set a minimum of 3 seconds - if u.Future < (3 * time.Second) { - u.Future = 3 * time.Second - } } case "except": ignoredDomains := c.RemainingArgs() @@ -204,3 +191,4 @@ func (u *staticUpstream) IsAllowedDomain(name string) bool { } func (u *staticUpstream) Exchanger() Exchanger { return u.ex } +func (u *staticUpstream) From() string { return u.from } diff --git a/plugin/reverse/reverse_test.go b/plugin/reverse/reverse_test.go index 6789c7cc8..dcd60d5dc 100644 --- a/plugin/reverse/reverse_test.go +++ b/plugin/reverse/reverse_test.go @@ -20,7 +20,7 @@ func TestReverse(t *testing.T) { em := Reverse{ Networks: networks{network{ IPnet: net4, - Zone: "example.org", + Zone: "example.org.", Template: "ip-{ip}.example.org.", RegexMatchIP: regexIP4, }}, @@ -37,7 +37,7 @@ func TestReverse(t *testing.T) { }{ { next: test.NextHandler(dns.RcodeSuccess, nil), - qname: "test.ip-10.1.1.2.example.org", + qname: "test.ip-10.1.1.2.example.org.", expectedCode: dns.RcodeSuccess, expectedReply: "10.1.1.2", expectedErr: nil, @@ -50,7 +50,7 @@ func TestReverse(t *testing.T) { req := new(dns.Msg) tr.qtype = dns.TypeA - req.SetQuestion(dns.Fqdn(tr.qname), tr.qtype) + req.SetQuestion(tr.qname, tr.qtype) rec := dnstest.NewRecorder(&test.ResponseWriter{}) code, err := em.ServeDNS(ctx, rec, req)