diff --git a/plugin/forward/README.md b/plugin/forward/README.md index 9223d22d8..d87ba2eec 100644 --- a/plugin/forward/README.md +++ b/plugin/forward/README.md @@ -82,6 +82,10 @@ forward FROM TO... { Also note the TLS config is "global" for the whole forwarding proxy if you need a different `tls-name` for different upstreams you're out of luck. +On each endpoint, the timeouts of the communication are set by default and automatically tuned depending early results. +- dialTimeout by default is 30 sec, and can decrease automatically down to 100ms +- readTimeout by default is 2 sec, and can decrease automatically down to 10ms + ## Metrics If monitoring is enabled (via the *prometheus* directive) then the following metric are exported: diff --git a/plugin/forward/connect.go b/plugin/forward/connect.go index e4bf64f2b..fe6313e0e 100644 --- a/plugin/forward/connect.go +++ b/plugin/forward/connect.go @@ -16,21 +16,65 @@ import ( "github.com/miekg/dns" ) -func (p *Proxy) readTimeout() time.Duration { - rtt := time.Duration(atomic.LoadInt64(&p.avgRtt)) +// limitTimeout is a utility function to auto-tune timeout values +// average observed time is moved towards the last observed delay moderated by a weight +// next timeout to use will be the double of the computed average, limited by min and max frame. +func limitTimeout(currentAvg *int64, minValue time.Duration, maxValue time.Duration) time.Duration { + rt := time.Duration(atomic.LoadInt64(currentAvg)) + if rt < minValue { + return minValue + } + if rt < maxValue/2 { + return 2 * rt + } + return maxValue +} - if rtt < minTimeout { - return minTimeout +func averageTimeout(currentAvg *int64, observedDuration time.Duration, weight int64) { + dt := time.Duration(atomic.LoadInt64(currentAvg)) + atomic.AddInt64(currentAvg, int64(observedDuration-dt)/weight) +} + +func (t *transport) dialTimeout() time.Duration { + return limitTimeout(&t.avgDialTime, minDialTimeout, maxDialTimeout) +} + +func (t *transport) updateDialTimeout(newDialTime time.Duration) { + averageTimeout(&t.avgDialTime, newDialTime, cumulativeAvgWeight) +} + +// Dial dials the address configured in transport, potentially reusing a connection or creating a new one. +func (t *transport) Dial(proto string) (*dns.Conn, bool, error) { + // If tls has been configured; use it. + if t.tlsConfig != nil { + proto = "tcp-tls" } - if rtt < maxTimeout/2 { - return 2 * rtt + + t.dial <- proto + c := <-t.ret + + if c != nil { + return c, true, nil } - return maxTimeout + + reqTime := time.Now() + timeout := t.dialTimeout() + if proto == "tcp-tls" { + conn, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, timeout) + t.updateDialTimeout(time.Since(reqTime)) + return conn, false, err + } + conn, err := dns.DialTimeout(proto, t.addr, timeout) + t.updateDialTimeout(time.Since(reqTime)) + return conn, false, err +} + +func (p *Proxy) readTimeout() time.Duration { + return limitTimeout(&p.avgRtt, minTimeout, maxTimeout) } func (p *Proxy) updateRtt(newRtt time.Duration) { - rtt := time.Duration(atomic.LoadInt64(&p.avgRtt)) - atomic.AddInt64(&p.avgRtt, int64((newRtt-rtt)/rttCount)) + averageTimeout(&p.avgRtt, newRtt, cumulativeAvgWeight) } // Connect selects an upstream, sends the request and waits for a response. @@ -92,4 +136,4 @@ func (p *Proxy) Connect(ctx context.Context, state request.Request, forceTCP, me return ret, nil } -const rttCount = 4 +const cumulativeAvgWeight = 4 diff --git a/plugin/forward/persistent.go b/plugin/forward/persistent.go index e84c56ddd..52bd24918 100644 --- a/plugin/forward/persistent.go +++ b/plugin/forward/persistent.go @@ -17,10 +17,11 @@ type persistConn struct { // transport hold the persistent cache. type transport struct { - conns map[string][]*persistConn // Buckets for udp, tcp and tcp-tls. - expire time.Duration // After this duration a connection is expired. - addr string - tlsConfig *tls.Config + avgDialTime int64 // kind of average time of dial time + conns map[string][]*persistConn // Buckets for udp, tcp and tcp-tls. + expire time.Duration // After this duration a connection is expired. + addr string + tlsConfig *tls.Config dial chan string yield chan *dns.Conn @@ -30,13 +31,14 @@ type transport struct { func newTransport(addr string, tlsConfig *tls.Config) *transport { t := &transport{ - conns: make(map[string][]*persistConn), - expire: defaultExpire, - addr: addr, - dial: make(chan string), - yield: make(chan *dns.Conn), - ret: make(chan *dns.Conn), - stop: make(chan bool), + avgDialTime: int64(defaultDialTimeout / 2), + conns: make(map[string][]*persistConn), + expire: defaultExpire, + addr: addr, + dial: make(chan string), + yield: make(chan *dns.Conn), + ret: make(chan *dns.Conn), + stop: make(chan bool), } return t } @@ -141,28 +143,6 @@ func (t *transport) cleanup(all bool) { } } -// Dial dials the address configured in transport, potentially reusing a connection or creating a new one. -func (t *transport) Dial(proto string) (*dns.Conn, bool, error) { - // If tls has been configured; use it. - if t.tlsConfig != nil { - proto = "tcp-tls" - } - - t.dial <- proto - c := <-t.ret - - if c != nil { - return c, true, nil - } - - if proto == "tcp-tls" { - conn, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, dialTimeout) - return conn, false, err - } - conn, err := dns.DialTimeout(proto, t.addr, dialTimeout) - return conn, false, err -} - // Yield return the connection to transport for reuse. func (t *transport) Yield(c *dns.Conn) { t.yield <- c } @@ -178,4 +158,9 @@ func (t *transport) SetExpire(expire time.Duration) { t.expire = expire } // SetTLSConfig sets the TLS config in transport. func (t *transport) SetTLSConfig(cfg *tls.Config) { t.tlsConfig = cfg } -const defaultExpire = 10 * time.Second +const ( + defaultExpire = 10 * time.Second + minDialTimeout = 100 * time.Millisecond + maxDialTimeout = 30 * time.Second + defaultDialTimeout = 30 * time.Second +) diff --git a/plugin/forward/persistent_test.go b/plugin/forward/persistent_test.go index 6aa8999f7..e046cf4de 100644 --- a/plugin/forward/persistent_test.go +++ b/plugin/forward/persistent_test.go @@ -140,9 +140,9 @@ func TestCleanupAll(t *testing.T) { tr := newTransport(s.Addr, nil /* no TLS */) - c1, _ := dns.DialTimeout("udp", tr.addr, dialTimeout) - c2, _ := dns.DialTimeout("udp", tr.addr, dialTimeout) - c3, _ := dns.DialTimeout("udp", tr.addr, dialTimeout) + c1, _ := dns.DialTimeout("udp", tr.addr, defaultDialTimeout) + c2, _ := dns.DialTimeout("udp", tr.addr, defaultDialTimeout) + c3, _ := dns.DialTimeout("udp", tr.addr, defaultDialTimeout) tr.conns["udp"] = []*persistConn{ {c1, time.Now()}, diff --git a/plugin/forward/proxy.go b/plugin/forward/proxy.go index df1f7ae97..a1fa7e6ae 100644 --- a/plugin/forward/proxy.go +++ b/plugin/forward/proxy.go @@ -106,9 +106,8 @@ func (p *Proxy) start(duration time.Duration) { } const ( - dialTimeout = 4 * time.Second - timeout = 2 * time.Second - maxTimeout = 2 * time.Second - minTimeout = 10 * time.Millisecond - hcDuration = 500 * time.Millisecond + timeout = 2 * time.Second + maxTimeout = 2 * time.Second + minTimeout = 10 * time.Millisecond + hcDuration = 500 * time.Millisecond )