mw/proxy: simplify google code (#1019)
* mw/proxy: simplify google code Minimize bootstrap code a bit, and block on the first resolve of the google https endpont. Add more logging and include actual error in the returned errors. Also re-resolve every 120 seconds, instead of 300 (might eventually make this an option). * fix test
This commit is contained in:
parent
3a96d1ab77
commit
9bcddc5c16
5 changed files with 48 additions and 43 deletions
|
@ -50,8 +50,8 @@ var dnsTestCasesProxy = []test.Case{
|
||||||
},
|
},
|
||||||
Extra: []dns.RR{
|
Extra: []dns.RR{
|
||||||
test.TXT("a.dom.skydns.test. 300 CH TXT \"www.example.org:0(10,0,,false)[0,]\""),
|
test.TXT("a.dom.skydns.test. 300 CH TXT \"www.example.org:0(10,0,,false)[0,]\""),
|
||||||
test.TXT("www.example.org. 0 CH TXT \"www.example.org.:0(0,0, IN A: unreachable backend,false)[0,]\""),
|
test.TXT("www.example.org. 0 CH TXT \"www.example.org.:0(0,0, IN A: unreachable backend: no upstream host,false)[0,]\""),
|
||||||
test.TXT("www.example.org. 0 CH TXT \"www.example.org.:0(0,0, IN AAAA: unreachable backend,false)[0,]\""),
|
test.TXT("www.example.org. 0 CH TXT \"www.example.org.:0(0,0, IN AAAA: unreachable backend: no upstream host,false)[0,]\""),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,10 +112,7 @@ func (g *google) exchangeJSON(addr, json string) ([]byte, error) {
|
||||||
return buf, nil
|
return buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *google) Transport() string {
|
func (g *google) Transport() string { return "tcp" }
|
||||||
return "tcp"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *google) Protocol() string { return "https_google" }
|
func (g *google) Protocol() string { return "https_google" }
|
||||||
|
|
||||||
func (g *google) OnShutdown(p *Proxy) error {
|
func (g *google) OnShutdown(p *Proxy) error {
|
||||||
|
@ -130,51 +127,55 @@ func (g *google) OnStartup(p *Proxy) error {
|
||||||
req.SetQuestion(g.endpoint, dns.TypeA)
|
req.SetQuestion(g.endpoint, dns.TypeA)
|
||||||
state := request.Request{W: new(fakeBootWriter), Req: req}
|
state := request.Request{W: new(fakeBootWriter), Req: req}
|
||||||
|
|
||||||
|
if len(*p.Upstreams) == 0 {
|
||||||
|
return fmt.Errorf("no upstreams defined")
|
||||||
|
}
|
||||||
|
|
||||||
|
oldUpstream := (*p.Upstreams)[0]
|
||||||
|
|
||||||
|
log.Printf("[INFO] Bootstrapping A records %q", g.endpoint)
|
||||||
|
|
||||||
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
|
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
|
||||||
|
|
||||||
var oldUpstream Upstream
|
|
||||||
|
|
||||||
// ignore errors here, as we want to keep on trying.
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
|
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
|
||||||
} else {
|
} else {
|
||||||
addrs, err1 := extractAnswer(new)
|
addrs, err1 := extractAnswer(new)
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
|
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err1)
|
||||||
}
|
} else {
|
||||||
|
|
||||||
if len(*p.Upstreams) > 0 {
|
|
||||||
oldUpstream = (*p.Upstreams)[0]
|
|
||||||
up := newUpstream(addrs, oldUpstream.(*staticUpstream))
|
up := newUpstream(addrs, oldUpstream.(*staticUpstream))
|
||||||
p.Upstreams = &[]Upstream{up}
|
p.Upstreams = &[]Upstream{up}
|
||||||
} else {
|
|
||||||
log.Printf("[WARNING] Failed to bootstrap upstreams %q", g.endpoint)
|
log.Printf("[INFO] Bootstrapping A records %q found: %v", g.endpoint, addrs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
tick := time.NewTicker(300 * time.Second)
|
tick := time.NewTicker(120 * time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-tick.C:
|
case <-tick.C:
|
||||||
|
|
||||||
|
log.Printf("[INFO] Resolving A records %q", g.endpoint)
|
||||||
|
|
||||||
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
|
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
|
log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err)
|
||||||
} else {
|
continue
|
||||||
addrs, err1 := extractAnswer(new)
|
}
|
||||||
if err1 != nil {
|
|
||||||
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
|
addrs, err1 := extractAnswer(new)
|
||||||
|
if err1 != nil {
|
||||||
|
log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(miek): can this actually happen?
|
|
||||||
if oldUpstream != nil {
|
|
||||||
up := newUpstream(addrs, oldUpstream.(*staticUpstream))
|
up := newUpstream(addrs, oldUpstream.(*staticUpstream))
|
||||||
p.Upstreams = &[]Upstream{up}
|
p.Upstreams = &[]Upstream{up}
|
||||||
}
|
|
||||||
}
|
log.Printf("[INFO] Resolving A records %q found: %v", g.endpoint, addrs)
|
||||||
|
|
||||||
case <-g.quit:
|
case <-g.quit:
|
||||||
return
|
return
|
||||||
|
|
|
@ -4,6 +4,7 @@ package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -14,9 +15,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewLookup create a new proxy with the hosts in host and a Random policy.
|
// NewLookup create a new proxy with the hosts in host and a Random policy.
|
||||||
func NewLookup(hosts []string) Proxy {
|
func NewLookup(hosts []string) Proxy { return NewLookupWithOption(hosts, Options{}) }
|
||||||
return NewLookupWithOption(hosts, Options{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLookupWithOption process creates a simple round robin forward with potentially forced proto for upstream.
|
// NewLookupWithOption process creates a simple round robin forward with potentially forced proto for upstream.
|
||||||
func NewLookupWithOption(hosts []string, opts Options) Proxy {
|
func NewLookupWithOption(hosts []string, opts Options) Proxy {
|
||||||
|
@ -95,13 +94,15 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
reply := new(dns.Msg)
|
||||||
|
var backendErr error
|
||||||
|
|
||||||
// Since Select() should give us "up" hosts, keep retrying
|
// Since Select() should give us "up" hosts, keep retrying
|
||||||
// hosts until timeout (or until we get a nil host).
|
// hosts until timeout (or until we get a nil host).
|
||||||
for time.Since(start) < tryDuration {
|
for time.Since(start) < tryDuration {
|
||||||
host := upstream.Select()
|
host := upstream.Select()
|
||||||
if host == nil {
|
if host == nil {
|
||||||
return nil, errUnreachable
|
return nil, fmt.Errorf("%s: %s", errUnreachable, "no upstream host")
|
||||||
}
|
}
|
||||||
|
|
||||||
// duplicated from proxy.go, but with a twist, we don't write the
|
// duplicated from proxy.go, but with a twist, we don't write the
|
||||||
|
@ -109,7 +110,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
|
||||||
|
|
||||||
atomic.AddInt64(&host.Conns, 1)
|
atomic.AddInt64(&host.Conns, 1)
|
||||||
|
|
||||||
reply, backendErr := upstream.Exchanger().Exchange(context.TODO(), host.Name, state)
|
reply, backendErr = upstream.Exchanger().Exchange(context.TODO(), host.Name, state)
|
||||||
|
|
||||||
atomic.AddInt64(&host.Conns, -1)
|
atomic.AddInt64(&host.Conns, -1)
|
||||||
|
|
||||||
|
@ -126,6 +127,6 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
|
||||||
atomic.AddInt32(&host.Fails, -1)
|
atomic.AddInt32(&host.Fails, -1)
|
||||||
}(host, timeout)
|
}(host, timeout)
|
||||||
}
|
}
|
||||||
return nil, errUnreachable
|
return nil, fmt.Errorf("%s: %s", errUnreachable, backendErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -70,6 +71,8 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
|
||||||
|
|
||||||
for {
|
for {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
reply := new(dns.Msg)
|
||||||
|
var backendErr error
|
||||||
|
|
||||||
// Since Select() should give us "up" hosts, keep retrying
|
// Since Select() should give us "up" hosts, keep retrying
|
||||||
// hosts until timeout (or until we get a nil host).
|
// hosts until timeout (or until we get a nil host).
|
||||||
|
@ -79,7 +82,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
|
||||||
|
|
||||||
RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
|
RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
|
||||||
return dns.RcodeServerFailure, errUnreachable
|
return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, "no upstream host")
|
||||||
}
|
}
|
||||||
|
|
||||||
if span != nil {
|
if span != nil {
|
||||||
|
@ -90,7 +93,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
|
||||||
atomic.AddInt64(&host.Conns, 1)
|
atomic.AddInt64(&host.Conns, 1)
|
||||||
queryEpoch := msg.Epoch()
|
queryEpoch := msg.Epoch()
|
||||||
|
|
||||||
reply, backendErr := upstream.Exchanger().Exchange(ctx, host.Name, state)
|
reply, backendErr = upstream.Exchanger().Exchange(ctx, host.Name, state)
|
||||||
|
|
||||||
respEpoch := msg.Epoch()
|
respEpoch := msg.Epoch()
|
||||||
atomic.AddInt64(&host.Conns, -1)
|
atomic.AddInt64(&host.Conns, -1)
|
||||||
|
@ -99,8 +102,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
|
||||||
child.Finish()
|
child.Finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
taperr := toDnstap(ctx, host.Name, upstream.Exchanger(), state, reply,
|
taperr := toDnstap(ctx, host.Name, upstream.Exchanger(), state, reply, queryEpoch, respEpoch)
|
||||||
queryEpoch, respEpoch)
|
|
||||||
|
|
||||||
if backendErr == nil {
|
if backendErr == nil {
|
||||||
w.WriteMsg(reply)
|
w.WriteMsg(reply)
|
||||||
|
@ -123,7 +125,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
|
||||||
|
|
||||||
RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
|
RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
|
||||||
|
|
||||||
return dns.RcodeServerFailure, errUnreachable
|
return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, backendErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,8 @@ func setup(c *caddy.Controller) error {
|
||||||
|
|
||||||
c.OnStartup(OnStartupMetrics)
|
c.OnStartup(OnStartupMetrics)
|
||||||
|
|
||||||
for _, u := range upstreams {
|
for i := range upstreams {
|
||||||
|
u := upstreams[i]
|
||||||
c.OnStartup(func() error {
|
c.OnStartup(func() error {
|
||||||
return u.Exchanger().OnStartup(P)
|
return u.Exchanger().OnStartup(P)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Add table
Reference in a new issue