Ensure cache.ResponseWriter can be used asynchronously during prefetch (#1884)
The default dns.Response implementation of a dns.ResponseWriter will panic if RemoteAddr() is called after the connection to the client has been closed already. The current cache implementation doesn't create a new request+responsewriter during an asynchronous prefetch, but piggybacks on the request triggering the prefetch. This change copies the RemoteAddr first, so that it's safe to use it later during the actual prefetch request. A better implementation would be to completely decouple the prefetch request from the client triggering a request.
This commit is contained in:
parent
f78f30231d
commit
9c2dc7a156
2 changed files with 40 additions and 8 deletions
34
plugin/cache/cache.go
vendored
34
plugin/cache/cache.go
vendored
|
@ -4,6 +4,7 @@ package cache
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"hash/fnv"
|
"hash/fnv"
|
||||||
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coredns/coredns/plugin"
|
"github.com/coredns/coredns/plugin"
|
||||||
|
@ -106,6 +107,39 @@ type ResponseWriter struct {
|
||||||
server string // Server handling the request.
|
server string // Server handling the request.
|
||||||
|
|
||||||
prefetch bool // When true write nothing back to the client.
|
prefetch bool // When true write nothing back to the client.
|
||||||
|
remoteAddr net.Addr
|
||||||
|
}
|
||||||
|
|
||||||
|
// newPrefetchResponseWriter returns a Cache ResponseWriter to be used in
|
||||||
|
// prefetch requests. It ensures RemoteAddr() can be called even after the
|
||||||
|
// original connetion has already been closed.
|
||||||
|
func newPrefetchResponseWriter(server string, state request.Request, c *Cache) *ResponseWriter {
|
||||||
|
// Resolve the address now, the connection might be already closed when the
|
||||||
|
// actual prefetch request is made.
|
||||||
|
addr := state.W.RemoteAddr()
|
||||||
|
// The protocol of the client triggering a cache prefetch doesn't matter.
|
||||||
|
// The address type is used by request.Proto to determine the response size,
|
||||||
|
// and using TCP ensures the message isn't unnecessarily truncated.
|
||||||
|
if u, ok := addr.(*net.UDPAddr); ok {
|
||||||
|
addr = &net.TCPAddr{IP: u.IP, Port: u.Port, Zone: u.Zone}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ResponseWriter{
|
||||||
|
ResponseWriter: state.W,
|
||||||
|
Cache: c,
|
||||||
|
state: state,
|
||||||
|
server: server,
|
||||||
|
prefetch: true,
|
||||||
|
remoteAddr: addr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteAddr implements the dns.ResponseWriter interface.
|
||||||
|
func (w *ResponseWriter) RemoteAddr() net.Addr {
|
||||||
|
if w.remoteAddr != nil {
|
||||||
|
return w.remoteAddr
|
||||||
|
}
|
||||||
|
return w.ResponseWriter.RemoteAddr()
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteMsg implements the dns.ResponseWriter interface.
|
// WriteMsg implements the dns.ResponseWriter interface.
|
||||||
|
|
12
plugin/cache/handler.go
vendored
12
plugin/cache/handler.go
vendored
|
@ -40,20 +40,18 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg)
|
||||||
|
|
||||||
threshold := int(math.Ceil(float64(c.percentage) / 100 * float64(i.origTTL)))
|
threshold := int(math.Ceil(float64(c.percentage) / 100 * float64(i.origTTL)))
|
||||||
if i.Freq.Hits() >= c.prefetch && ttl <= threshold {
|
if i.Freq.Hits() >= c.prefetch && ttl <= threshold {
|
||||||
go func() {
|
cw := newPrefetchResponseWriter(server, state, c)
|
||||||
|
go func(w dns.ResponseWriter) {
|
||||||
cachePrefetches.WithLabelValues(server).Inc()
|
cachePrefetches.WithLabelValues(server).Inc()
|
||||||
|
plugin.NextOrFailure(c.Name(), c.Next, ctx, w, r)
|
||||||
|
|
||||||
// When prefetching we loose the item i, and with it the frequency
|
// When prefetching we loose the item i, and with it the frequency
|
||||||
// that we've gathered sofar. See we copy the frequencies info back
|
// that we've gathered sofar. See we copy the frequencies info back
|
||||||
// into the new item that was stored in the cache.
|
// into the new item that was stored in the cache.
|
||||||
prr := &ResponseWriter{ResponseWriter: w, Cache: c,
|
|
||||||
prefetch: true, state: state,
|
|
||||||
server: server}
|
|
||||||
plugin.NextOrFailure(c.Name(), c.Next, ctx, prr, r)
|
|
||||||
|
|
||||||
if i1 := c.exists(state); i1 != nil {
|
if i1 := c.exists(state); i1 != nil {
|
||||||
i1.Freq.Reset(now, i.Freq.Hits())
|
i1.Freq.Reset(now, i.Freq.Hits())
|
||||||
}
|
}
|
||||||
}()
|
}(cw)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dns.RcodeSuccess, nil
|
return dns.RcodeSuccess, nil
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue