middleware/proxy: remove singleinflight from dns (#717)
Singleinflight interferes with the health checking of upstream. If an upstream would fail, singleinflight would mirror that error to to other proxy *iff* multple identical queries would be inflight. This would lead to marking *all* upstreams as bad, essentially collapsing multiple upstreams into a SPOF. Clearly not what we want. Singleinflight does have some nice properties, but I've opted to rip it out entirely. Caching should almost (but not quite) as good. Added a test case in test that uses 3 CoreDNS instances to reflect the setup from #715. Found another bug as well, where (when the policy would be nil), we would always Spray even though we've found a healthy host.
This commit is contained in:
parent
36396e94ab
commit
1c45e262f5
3 changed files with 83 additions and 25 deletions
|
@ -5,7 +5,6 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coredns/coredns/middleware/pkg/singleflight"
|
|
||||||
"github.com/coredns/coredns/request"
|
"github.com/coredns/coredns/request"
|
||||||
|
|
||||||
"github.com/miekg/dns"
|
"github.com/miekg/dns"
|
||||||
|
@ -13,7 +12,6 @@ import (
|
||||||
|
|
||||||
type dnsEx struct {
|
type dnsEx struct {
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
group *singleflight.Group
|
|
||||||
Options
|
Options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +24,7 @@ func newDNSEx() *dnsEx {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDNSExWithOption(opt Options) *dnsEx {
|
func newDNSExWithOption(opt Options) *dnsEx {
|
||||||
return &dnsEx{group: new(singleflight.Group), Timeout: defaultTimeout * time.Second, Options: opt}
|
return &dnsEx{Timeout: defaultTimeout * time.Second, Options: opt}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dnsEx) Protocol() string { return "dns" }
|
func (d *dnsEx) Protocol() string { return "dns" }
|
||||||
|
@ -65,30 +63,14 @@ func (d *dnsEx) Exchange(ctx context.Context, addr string, state request.Request
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dnsEx) ExchangeConn(m *dns.Msg, co net.Conn) (*dns.Msg, time.Duration, error) {
|
func (d *dnsEx) ExchangeConn(m *dns.Msg, co net.Conn) (*dns.Msg, time.Duration, error) {
|
||||||
t := "nop"
|
|
||||||
if t1, ok := dns.TypeToString[m.Question[0].Qtype]; ok {
|
|
||||||
t = t1
|
|
||||||
}
|
|
||||||
cl := "nop"
|
|
||||||
if cl1, ok := dns.ClassToString[m.Question[0].Qclass]; ok {
|
|
||||||
cl = cl1
|
|
||||||
}
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
r, err := exchange(m, co)
|
||||||
// Name needs to be normalized! Bug in go dns.
|
|
||||||
r, err := d.group.Do(m.Question[0].Name+t+cl, func() (interface{}, error) {
|
|
||||||
return exchange(m, co)
|
|
||||||
})
|
|
||||||
|
|
||||||
r1 := r.(dns.Msg)
|
|
||||||
rtt := time.Since(start)
|
rtt := time.Since(start)
|
||||||
return &r1, rtt, err
|
|
||||||
|
return r, rtt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// exchange does *not* return a pointer to dns.Msg because that leads to buffer reuse when
|
func exchange(m *dns.Msg, co net.Conn) (*dns.Msg, error) {
|
||||||
// group.Do is used in Exchange.
|
|
||||||
func exchange(m *dns.Msg, co net.Conn) (dns.Msg, error) {
|
|
||||||
opt := m.IsEdns0()
|
opt := m.IsEdns0()
|
||||||
|
|
||||||
udpsize := uint16(dns.MinMsgSize)
|
udpsize := uint16(dns.MinMsgSize)
|
||||||
|
@ -109,7 +91,7 @@ func exchange(m *dns.Msg, co net.Conn) (dns.Msg, error) {
|
||||||
|
|
||||||
dnsco.Close()
|
dnsco.Close()
|
||||||
if r == nil {
|
if r == nil {
|
||||||
return dns.Msg{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return *r, err
|
return r, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -328,6 +328,9 @@ func (u *staticUpstream) Select() *UpstreamHost {
|
||||||
|
|
||||||
if u.Policy == nil {
|
if u.Policy == nil {
|
||||||
h := (&Random{}).Select(pool)
|
h := (&Random{}).Select(pool)
|
||||||
|
if h != nil {
|
||||||
|
return h
|
||||||
|
}
|
||||||
if h == nil && u.Spray == nil {
|
if h == nil && u.Spray == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,3 +41,76 @@ func TestProxyErratic(t *testing.T) {
|
||||||
// tests that it times out.
|
// tests that it times out.
|
||||||
p.Lookup(state, "example.org.", dns.TypeA)
|
p.Lookup(state, "example.org.", dns.TypeA)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProxyThreeWay(t *testing.T) {
|
||||||
|
// Run 3 CoreDNS server, 2 upstream ones and a proxy. 1 Upstream is unhealthy after 1 query, but after
|
||||||
|
// that we should still be able to send to the other one
|
||||||
|
log.SetOutput(ioutil.Discard)
|
||||||
|
|
||||||
|
// Backend CoreDNS's.
|
||||||
|
corefileUp1 := `example.org:0 {
|
||||||
|
erratic {
|
||||||
|
drop 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
up1, err := CoreDNSServer(corefileUp1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get CoreDNS serving instance: %s", err)
|
||||||
|
}
|
||||||
|
defer up1.Stop()
|
||||||
|
|
||||||
|
corefileUp2 := `example.org:0 {
|
||||||
|
whoami
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
up2, err := CoreDNSServer(corefileUp2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get CoreDNS serving instance: %s", err)
|
||||||
|
}
|
||||||
|
defer up2.Stop()
|
||||||
|
|
||||||
|
addr1, _ := CoreDNSServerPorts(up1, 0)
|
||||||
|
if addr1 == "" {
|
||||||
|
t.Fatalf("Could not get UDP listening port")
|
||||||
|
}
|
||||||
|
addr2, _ := CoreDNSServerPorts(up2, 0)
|
||||||
|
if addr2 == "" {
|
||||||
|
t.Fatalf("Could not get UDP listening port")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxying CoreDNS.
|
||||||
|
corefileProxy := `example.org:0 {
|
||||||
|
proxy . ` + addr1 + " " + addr2 + ` {
|
||||||
|
max_fails 1
|
||||||
|
}
|
||||||
|
}`
|
||||||
|
|
||||||
|
prx, err := CoreDNSServer(corefileProxy)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not get CoreDNS serving instance: %s", err)
|
||||||
|
}
|
||||||
|
defer prx.Stop()
|
||||||
|
addr, _ := CoreDNSServerPorts(prx, 0)
|
||||||
|
if addr == "" {
|
||||||
|
t.Fatalf("Could not get UDP listening port")
|
||||||
|
}
|
||||||
|
|
||||||
|
m := new(dns.Msg)
|
||||||
|
m.SetQuestion("example.org.", dns.TypeA)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
r, err := dns.Exchange(m, addr)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// We would previously get SERVFAIL, so just getting answers here
|
||||||
|
// is a good sign. The actuall timeouts are handled in the err != nil case
|
||||||
|
// above.
|
||||||
|
if r.Rcode != dns.RcodeSuccess {
|
||||||
|
t.Fatalf("Expected success rcode, got %d", r.Rcode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue