Move Healthcheck to middleware/pkg/healthcheck (#854)

* Move healthcheck out

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>

* Move healthcheck to middleware/pkg/healthcheck

Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
This commit is contained in:
Yong Tang 2017-08-09 09:21:33 -07:00 committed by GitHub
parent 57692bbccf
commit a1b175ef78
9 changed files with 359 additions and 348 deletions

View file

@ -0,0 +1,236 @@
package healthcheck
import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
)
// UpstreamHostDownFunc can be used to customize how Down behaves.
type UpstreamHostDownFunc func(*UpstreamHost) bool
// UpstreamHost represents a single proxy upstream
type UpstreamHost struct {
Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
Name string // IP address (and port) of this upstream host
Fails int32
FailTimeout time.Duration
OkUntil time.Time
CheckDown UpstreamHostDownFunc
CheckURL string
WithoutPathPrefix string
Checking bool
CheckMu sync.Mutex
}
// Down checks whether the upstream host is down or not.
// Down will try to use uh.CheckDown first, and will fall
// back to some default criteria if necessary.
func (uh *UpstreamHost) Down() bool {
if uh.CheckDown == nil {
// Default settings
fails := atomic.LoadInt32(&uh.Fails)
after := false
uh.CheckMu.Lock()
until := uh.OkUntil
uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
after = true
}
return after || fails > 0
}
return uh.CheckDown(uh)
}
// HostPool is a collection of UpstreamHosts.
type HostPool []*UpstreamHost
type HealthCheck struct {
wg sync.WaitGroup // Used to wait for running goroutines to stop.
stop chan struct{} // Signals running goroutines to stop.
Hosts HostPool
Policy Policy
Spray Policy
FailTimeout time.Duration
MaxFails int32
Future time.Duration
Path string
Port string
Interval time.Duration
}
func (u *HealthCheck) Start() {
u.stop = make(chan struct{})
if u.Path != "" {
u.wg.Add(1)
go func() {
defer u.wg.Done()
u.HealthCheckWorker(u.stop)
}()
}
}
// Stop sends a signal to all goroutines started by this staticUpstream to exit
// and waits for them to finish before returning.
func (u *HealthCheck) Stop() error {
close(u.stop)
u.wg.Wait()
return nil
}
// This was moved into a thread so that each host could throw a health
// check at the same time. The reason for this is that if we are checking
// 3 hosts, and the first one is gone, and we spend minutes timing out to
// fail it, we would not have been doing any other health checks in that
// time. So we now have a per-host lock and a threaded health check.
//
// We use the Checking bool to avoid concurrent checks against the same
// host; if one is taking a long time, the next one will find a check in
// progress and simply return before trying.
//
// We are carefully avoiding having the mutex locked while we check,
// otherwise checks will back up, potentially a lot of them if a host is
// absent for a long time. This arrangement makes checks quickly see if
// they are the only one running and abort otherwise.
func healthCheckURL(nextTs time.Time, host *UpstreamHost) {
// lock for our bool check. We don't just defer the unlock because
// we don't want the lock held while http.Get runs
host.CheckMu.Lock()
// are we mid check? Don't run another one
if host.Checking {
host.CheckMu.Unlock()
return
}
host.Checking = true
host.CheckMu.Unlock()
//log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
// fetch that url. This has been moved into a go func because
// when the remote host is not merely not serving, but actually
// absent, then tcp syn timeouts can be very long, and so one
// fetch could last several check intervals
if r, err := http.Get(host.CheckURL); err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if r.StatusCode < 200 || r.StatusCode >= 400 {
log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
host.Name, r.StatusCode)
nextTs = time.Unix(0, 0)
}
} else {
log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
nextTs = time.Unix(0, 0)
}
host.CheckMu.Lock()
host.Checking = false
host.OkUntil = nextTs
host.CheckMu.Unlock()
}
func (u *HealthCheck) healthCheck() {
for _, host := range u.Hosts {
if host.CheckURL == "" {
var hostName, checkPort string
// The DNS server might be an HTTP server. If so, extract its name.
ret, err := url.Parse(host.Name)
if err == nil && len(ret.Host) > 0 {
hostName = ret.Host
} else {
hostName = host.Name
}
// Extract the port number from the parsed server name.
checkHostName, checkPort, err := net.SplitHostPort(hostName)
if err != nil {
checkHostName = hostName
}
if u.Port != "" {
checkPort = u.Port
}
host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path
}
// calculate this before the get
nextTs := time.Now().Add(u.Future)
// locks/bools should prevent requests backing up
go healthCheckURL(nextTs, host)
}
}
func (u *HealthCheck) HealthCheckWorker(stop chan struct{}) {
ticker := time.NewTicker(u.Interval)
u.healthCheck()
for {
select {
case <-ticker.C:
u.healthCheck()
case <-stop:
ticker.Stop()
return
}
}
}
func (u *HealthCheck) Select() *UpstreamHost {
pool := u.Hosts
if len(pool) == 1 {
if pool[0].Down() && u.Spray == nil {
return nil
}
return pool[0]
}
allDown := true
for _, host := range pool {
if !host.Down() {
allDown = false
break
}
}
if allDown {
if u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}
if u.Policy == nil {
h := (&Random{}).Select(pool)
if h != nil {
return h
}
if h == nil && u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}
h := u.Policy.Select(pool)
if h != nil {
return h
}
if u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}

View file

@ -1,4 +1,4 @@
package proxy package healthcheck
import ( import (
"log" "log"
@ -6,8 +6,14 @@ import (
"sync/atomic" "sync/atomic"
) )
// HostPool is a collection of UpstreamHosts. var (
type HostPool []*UpstreamHost SupportedPolicies = make(map[string]func() Policy)
)
// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
SupportedPolicies[name] = policy
}
// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the // Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the
// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent // healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent

View file

@ -1,6 +1,8 @@
package proxy package healthcheck
import ( import (
"io/ioutil"
"log"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
@ -41,6 +43,57 @@ func testPool() HostPool {
return HostPool(pool) return HostPool(pool)
} }
func TestRegisterPolicy(t *testing.T) {
name := "custom"
customPolicy := &customPolicy{}
RegisterPolicy(name, func() Policy { return customPolicy })
if _, ok := SupportedPolicies[name]; !ok {
t.Error("Expected supportedPolicies to have a custom policy.")
}
}
func TestHealthCheck(t *testing.T) {
log.SetOutput(ioutil.Discard)
u := &HealthCheck{
Hosts: testPool(),
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
u.healthCheck()
// sleep a bit, it's async now
time.Sleep(time.Duration(2 * time.Second))
if u.Hosts[0].Down() {
t.Error("Expected first host in testpool to not fail healthcheck.")
}
if !u.Hosts[1].Down() {
t.Error("Expected second host in testpool to fail healthcheck.")
}
}
func TestSelect(t *testing.T) {
u := &HealthCheck{
Hosts: testPool()[:3],
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
u.Hosts[0].OkUntil = time.Unix(0, 0)
u.Hosts[1].OkUntil = time.Unix(0, 0)
u.Hosts[2].OkUntil = time.Unix(0, 0)
if h := u.Select(); h != nil {
t.Error("Expected select to return nil as all host are down")
}
u.Hosts[2].OkUntil = time.Time{}
if h := u.Select(); h == nil {
t.Error("Expected select to not return nil")
}
}
func TestRoundRobinPolicy(t *testing.T) { func TestRoundRobinPolicy(t *testing.T) {
pool := testPool() pool := testPool()
rrPolicy := &RoundRobin{} rrPolicy := &RoundRobin{}

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/coredns/coredns/middleware/pkg/debug" "github.com/coredns/coredns/middleware/pkg/debug"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
"github.com/miekg/dns" "github.com/miekg/dns"
@ -201,33 +202,32 @@ func extractAnswer(m *dns.Msg) ([]string, error) {
func newUpstream(hosts []string, old *staticUpstream) Upstream { func newUpstream(hosts []string, old *staticUpstream) Upstream {
upstream := &staticUpstream{ upstream := &staticUpstream{
from: old.from, from: old.from,
Hosts: nil, HealthCheck: healthcheck.HealthCheck{
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second, FailTimeout: 10 * time.Second,
MaxFails: 3, MaxFails: 3,
Future: 60 * time.Second, Future: 60 * time.Second,
},
ex: old.ex, ex: old.ex,
WithoutPathPrefix: old.WithoutPathPrefix, WithoutPathPrefix: old.WithoutPathPrefix,
IgnoredSubDomains: old.IgnoredSubDomains, IgnoredSubDomains: old.IgnoredSubDomains,
} }
upstream.Hosts = make([]*UpstreamHost, len(hosts)) upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
for i, h := range hosts { for i, h := range hosts {
uh := &UpstreamHost{ uh := &healthcheck.UpstreamHost{
Name: h, Name: h,
Conns: 0, Conns: 0,
Fails: 0, Fails: 0,
FailTimeout: upstream.FailTimeout, FailTimeout: upstream.FailTimeout,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *healthcheck.UpstreamHost) bool {
down := false down := false
uh.checkMu.Lock() uh.CheckMu.Lock()
until := uh.OkUntil until := uh.OkUntil
uh.checkMu.Unlock() uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) { if !until.IsZero() && time.Now().After(until) {
down = true down = true

View file

@ -4,11 +4,13 @@ import (
"testing" "testing"
"time" "time"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
) )
func pool() []*UpstreamHost { func pool() []*healthcheck.UpstreamHost {
return []*UpstreamHost{ return []*healthcheck.UpstreamHost{
{ {
Name: "localhost:10053", Name: "localhost:10053",
}, },
@ -23,12 +25,12 @@ func TestStartupShutdown(t *testing.T) {
upstream := &staticUpstream{ upstream := &staticUpstream{
from: ".", from: ".",
HealthCheck: healthcheck.HealthCheck{
Hosts: pool(), Hosts: pool(),
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second, FailTimeout: 10 * time.Second,
Future: 60 * time.Second, Future: 60 * time.Second,
MaxFails: 1, MaxFails: 1,
},
} }
g := newGrpcClient(nil, upstream) g := newGrpcClient(nil, upstream)
upstream.ex = g upstream.ex = g

View file

@ -7,6 +7,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
"github.com/miekg/dns" "github.com/miekg/dns"
@ -25,30 +26,30 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy {
// we should copy/make something similar. // we should copy/make something similar.
upstream := &staticUpstream{ upstream := &staticUpstream{
from: ".", from: ".",
Hosts: make([]*UpstreamHost, len(hosts)), HealthCheck: healthcheck.HealthCheck{
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second, FailTimeout: 10 * time.Second,
MaxFails: 3, // TODO(miek): disable error checking for simple lookups? MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
Future: 60 * time.Second, Future: 60 * time.Second,
},
ex: newDNSExWithOption(opts), ex: newDNSExWithOption(opts),
} }
upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
for i, host := range hosts { for i, host := range hosts {
uh := &UpstreamHost{ uh := &healthcheck.UpstreamHost{
Name: host, Name: host,
Conns: 0, Conns: 0,
Fails: 0, Fails: 0,
FailTimeout: upstream.FailTimeout, FailTimeout: upstream.FailTimeout,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *healthcheck.UpstreamHost) bool {
down := false down := false
uh.checkMu.Lock() uh.CheckMu.Lock()
until := uh.OkUntil until := uh.OkUntil
uh.checkMu.Unlock() uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) { if !until.IsZero() && time.Now().After(until) {
down = true down = true
@ -120,7 +121,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
timeout = 10 * time.Second timeout = 10 * time.Second
} }
atomic.AddInt32(&host.Fails, 1) atomic.AddInt32(&host.Fails, 1)
go func(host *UpstreamHost, timeout time.Duration) { go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
time.Sleep(timeout) time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1) atomic.AddInt32(&host.Fails, -1)
}(host, timeout) }(host, timeout)

View file

@ -3,11 +3,11 @@ package proxy
import ( import (
"errors" "errors"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
"github.com/miekg/dns" "github.com/miekg/dns"
@ -41,7 +41,7 @@ type Upstream interface {
// The domain name this upstream host should be routed on. // The domain name this upstream host should be routed on.
From() string From() string
// Selects an upstream host to be routed to. // Selects an upstream host to be routed to.
Select() *UpstreamHost Select() *healthcheck.UpstreamHost
// Checks if subpdomain is not an ignored. // Checks if subpdomain is not an ignored.
IsAllowedDomain(string) bool IsAllowedDomain(string) bool
// Exchanger returns the exchanger to be used for this upstream. // Exchanger returns the exchanger to be used for this upstream.
@ -50,45 +50,6 @@ type Upstream interface {
Stop() error Stop() error
} }
// UpstreamHostDownFunc can be used to customize how Down behaves.
type UpstreamHostDownFunc func(*UpstreamHost) bool
// UpstreamHost represents a single proxy upstream
type UpstreamHost struct {
Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
Name string // IP address (and port) of this upstream host
Fails int32
FailTimeout time.Duration
OkUntil time.Time
CheckDown UpstreamHostDownFunc
CheckURL string
WithoutPathPrefix string
Checking bool
checkMu sync.Mutex
}
// Down checks whether the upstream host is down or not.
// Down will try to use uh.CheckDown first, and will fall
// back to some default criteria if necessary.
func (uh *UpstreamHost) Down() bool {
if uh.CheckDown == nil {
// Default settings
fails := atomic.LoadInt32(&uh.Fails)
after := false
uh.checkMu.Lock()
until := uh.OkUntil
uh.checkMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
after = true
}
return after || fails > 0
}
return uh.CheckDown(uh)
}
// tryDuration is how long to try upstream hosts; failures result in // tryDuration is how long to try upstream hosts; failures result in
// immediate retries until this duration ends or we get a nil host. // immediate retries until this duration ends or we get a nil host.
var tryDuration = 60 * time.Second var tryDuration = 60 * time.Second
@ -145,7 +106,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
timeout = 10 * time.Second timeout = 10 * time.Second
} }
atomic.AddInt32(&host.Fails, 1) atomic.AddInt32(&host.Fails, 1)
go func(host *UpstreamHost, timeout time.Duration) { go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
time.Sleep(timeout) time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1) atomic.AddInt32(&host.Fails, -1)
}(host, timeout) }(host, timeout)

View file

@ -2,46 +2,25 @@ package proxy
import ( import (
"fmt" "fmt"
"io"
"io/ioutil"
"log"
"net" "net"
"net/http"
"net/url"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/pkg/dnsutil" "github.com/coredns/coredns/middleware/pkg/dnsutil"
"github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/middleware/pkg/tls" "github.com/coredns/coredns/middleware/pkg/tls"
"github.com/mholt/caddy/caddyfile" "github.com/mholt/caddy/caddyfile"
"github.com/miekg/dns" "github.com/miekg/dns"
) )
var (
supportedPolicies = make(map[string]func() Policy)
)
type staticUpstream struct { type staticUpstream struct {
from string from string
stop chan struct{} // Signals running goroutines to stop.
wg sync.WaitGroup // Used to wait for running goroutines to stop.
Hosts HostPool healthcheck.HealthCheck
Policy Policy
Spray Policy
FailTimeout time.Duration
MaxFails int32
Future time.Duration
HealthCheck struct {
Path string
Port string
Interval time.Duration
}
WithoutPathPrefix string WithoutPathPrefix string
IgnoredSubDomains []string IgnoredSubDomains []string
ex Exchanger ex Exchanger
@ -54,13 +33,11 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
for c.Next() { for c.Next() {
upstream := &staticUpstream{ upstream := &staticUpstream{
from: ".", from: ".",
stop: make(chan struct{}), HealthCheck: healthcheck.HealthCheck{
Hosts: nil,
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second, FailTimeout: 10 * time.Second,
MaxFails: 1, MaxFails: 1,
Future: 60 * time.Second, Future: 60 * time.Second,
},
ex: newDNSEx(), ex: newDNSEx(),
} }
@ -84,22 +61,22 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
} }
} }
upstream.Hosts = make([]*UpstreamHost, len(toHosts)) upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts))
for i, host := range toHosts { for i, host := range toHosts {
uh := &UpstreamHost{ uh := &healthcheck.UpstreamHost{
Name: host, Name: host,
Conns: 0, Conns: 0,
Fails: 0, Fails: 0,
FailTimeout: upstream.FailTimeout, FailTimeout: upstream.FailTimeout,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool { return func(uh *healthcheck.UpstreamHost) bool {
down := false down := false
uh.checkMu.Lock() uh.CheckMu.Lock()
until := uh.OkUntil until := uh.OkUntil
uh.checkMu.Unlock() uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) { if !until.IsZero() && time.Now().After(until) {
down = true down = true
@ -117,32 +94,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
upstream.Hosts[i] = uh upstream.Hosts[i] = uh
} }
upstream.Start()
if upstream.HealthCheck.Path != "" {
upstream.wg.Add(1)
go func() {
defer upstream.wg.Done()
upstream.HealthCheckWorker(upstream.stop)
}()
}
upstreams = append(upstreams, upstream) upstreams = append(upstreams, upstream)
} }
return upstreams, nil return upstreams, nil
} }
// Stop sends a signal to all goroutines started by this staticUpstream to exit
// and waits for them to finish before returning.
func (u *staticUpstream) Stop() error {
close(u.stop)
u.wg.Wait()
return nil
}
// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
}
func (u *staticUpstream) From() string { func (u *staticUpstream) From() string {
return u.from return u.from
} }
@ -153,7 +111,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
if !c.NextArg() { if !c.NextArg() {
return c.ArgErr() return c.ArgErr()
} }
policyCreateFunc, ok := supportedPolicies[c.Val()] policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()]
if !ok { if !ok {
return c.ArgErr() return c.ArgErr()
} }
@ -214,7 +172,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
} }
u.IgnoredSubDomains = ignoredDomains u.IgnoredSubDomains = ignoredDomains
case "spray": case "spray":
u.Spray = &Spray{} u.Spray = &healthcheck.Spray{}
case "protocol": case "protocol":
encArgs := c.RemainingArgs() encArgs := c.RemainingArgs()
if len(encArgs) == 0 { if len(encArgs) == 0 {
@ -259,154 +217,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
return nil return nil
} }
// This was moved into a thread so that each host could throw a health
// check at the same time. The reason for this is that if we are checking
// 3 hosts, and the first one is gone, and we spend minutes timing out to
// fail it, we would not have been doing any other health checks in that
// time. So we now have a per-host lock and a threaded health check.
//
// We use the Checking bool to avoid concurrent checks against the same
// host; if one is taking a long time, the next one will find a check in
// progress and simply return before trying.
//
// We are carefully avoiding having the mutex locked while we check,
// otherwise checks will back up, potentially a lot of them if a host is
// absent for a long time. This arrangement makes checks quickly see if
// they are the only one running and abort otherwise.
func healthCheckURL(nextTs time.Time, host *UpstreamHost) {
// lock for our bool check. We don't just defer the unlock because
// we don't want the lock held while http.Get runs
host.checkMu.Lock()
// are we mid check? Don't run another one
if host.Checking {
host.checkMu.Unlock()
return
}
host.Checking = true
host.checkMu.Unlock()
//log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
// fetch that url. This has been moved into a go func because
// when the remote host is not merely not serving, but actually
// absent, then tcp syn timeouts can be very long, and so one
// fetch could last several check intervals
if r, err := http.Get(host.CheckURL); err == nil {
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
if r.StatusCode < 200 || r.StatusCode >= 400 {
log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
host.Name, r.StatusCode)
nextTs = time.Unix(0, 0)
}
} else {
log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
nextTs = time.Unix(0, 0)
}
host.checkMu.Lock()
host.Checking = false
host.OkUntil = nextTs
host.checkMu.Unlock()
}
func (u *staticUpstream) healthCheck() {
for _, host := range u.Hosts {
if host.CheckURL == "" {
var hostName, checkPort string
// The DNS server might be an HTTP server. If so, extract its name.
ret, err := url.Parse(host.Name)
if err == nil && len(ret.Host) > 0 {
hostName = ret.Host
} else {
hostName = host.Name
}
// Extract the port number from the parsed server name.
checkHostName, checkPort, err := net.SplitHostPort(hostName)
if err != nil {
checkHostName = hostName
}
if u.HealthCheck.Port != "" {
checkPort = u.HealthCheck.Port
}
host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path
}
// calculate this before the get
nextTs := time.Now().Add(u.Future)
// locks/bools should prevent requests backing up
go healthCheckURL(nextTs, host)
}
}
func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) {
ticker := time.NewTicker(u.HealthCheck.Interval)
u.healthCheck()
for {
select {
case <-ticker.C:
u.healthCheck()
case <-stop:
ticker.Stop()
return
}
}
}
func (u *staticUpstream) Select() *UpstreamHost {
pool := u.Hosts
if len(pool) == 1 {
if pool[0].Down() && u.Spray == nil {
return nil
}
return pool[0]
}
allDown := true
for _, host := range pool {
if !host.Down() {
allDown = false
break
}
}
if allDown {
if u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}
if u.Policy == nil {
h := (&Random{}).Select(pool)
if h != nil {
return h
}
if h == nil && u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}
h := u.Policy.Select(pool)
if h != nil {
return h
}
if u.Spray == nil {
return nil
}
return u.Spray.Select(pool)
}
func (u *staticUpstream) IsAllowedDomain(name string) bool { func (u *staticUpstream) IsAllowedDomain(name string) bool {
if dns.Name(name) == dns.Name(u.From()) { if dns.Name(name) == dns.Name(u.From()) {
return true return true

View file

@ -2,74 +2,16 @@ package proxy
import ( import (
"io/ioutil" "io/ioutil"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
"time"
"github.com/coredns/coredns/middleware/test" "github.com/coredns/coredns/middleware/test"
"github.com/mholt/caddy" "github.com/mholt/caddy"
) )
func TestHealthCheck(t *testing.T) {
log.SetOutput(ioutil.Discard)
upstream := &staticUpstream{
from: ".",
Hosts: testPool(),
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
upstream.healthCheck()
// sleep a bit, it's async now
time.Sleep(time.Duration(2 * time.Second))
if upstream.Hosts[0].Down() {
t.Error("Expected first host in testpool to not fail healthcheck.")
}
if !upstream.Hosts[1].Down() {
t.Error("Expected second host in testpool to fail healthcheck.")
}
}
func TestSelect(t *testing.T) {
upstream := &staticUpstream{
from: ".",
Hosts: testPool()[:3],
Policy: &Random{},
FailTimeout: 10 * time.Second,
Future: 60 * time.Second,
MaxFails: 1,
}
upstream.Hosts[0].OkUntil = time.Unix(0, 0)
upstream.Hosts[1].OkUntil = time.Unix(0, 0)
upstream.Hosts[2].OkUntil = time.Unix(0, 0)
if h := upstream.Select(); h != nil {
t.Error("Expected select to return nil as all host are down")
}
upstream.Hosts[2].OkUntil = time.Time{}
if h := upstream.Select(); h == nil {
t.Error("Expected select to not return nil")
}
}
func TestRegisterPolicy(t *testing.T) {
name := "custom"
customPolicy := &customPolicy{}
RegisterPolicy(name, func() Policy { return customPolicy })
if _, ok := supportedPolicies[name]; !ok {
t.Error("Expected supportedPolicies to have a custom policy.")
}
}
func TestAllowedDomain(t *testing.T) { func TestAllowedDomain(t *testing.T) {
upstream := &staticUpstream{ upstream := &staticUpstream{
from: "miek.nl.", from: "miek.nl.",