plugin/cache: Fix prefetching issues (#1363)
* Improve plugin/cache metrics * Add coredns_cache_prefetch_total metric to track number of prefetches. * Remove unnecessary Cache.get() call which would incorrectly increment cache counters. * Initialize all counters and gauges at zero. * Allow prefetching of a single request per ttl The original implementation didn't allow prefetching queries which are only requested once during the duration of a TTL. The minimum amount of queries which had to be seen was therefore capped at 2. This change also implements a real prefetch test. The existing test was a noop and always passed regardless of any prefetch implementation. * Fix prefetching for items with a short TTL The default prefetch threshold (percentage) is 10% of the lifetime of a cache item. With the previous implementation, this disabled prefetching for all items with a TTL < 10s (the resulting percentage would be 0, at which point a cached item is already discarded). This change uses a time based threshold calculation and ensures that a prefetch is triggered at a TTL of 1 at the latest. * Fix wrong duration reporting of cached responses The logging and metrics plugins (among others) included the duration of a cache prefetch in the request latency of client request. This change fixes this wrong reporting and executes the prefetch request in a goroutine in the background.
This commit is contained in:
parent
fe0767987e
commit
dd9fc8962c
8 changed files with 212 additions and 74 deletions
4
plugin/cache/README.md
vendored
4
plugin/cache/README.md
vendored
|
@ -44,8 +44,8 @@ cache [TTL] [ZONES...] {
|
|||
* `prefetch` will prefetch popular items when they are about to be expunged from the cache.
|
||||
Popular means **AMOUNT** queries have been seen with no gaps of **DURATION** or more between them.
|
||||
**DURATION** defaults to 1m. Prefetching will happen when the TTL drops below **PERCENTAGE**,
|
||||
which defaults to `10%`. Values should be in the range `[10%, 90%]`. Note the percent sign is
|
||||
mandatory. **PERCENTAGE** is treated as an `int`.
|
||||
which defaults to `10%`, or latest 1 second before TTL expiration. Values should be in the range `[10%, 90%]`.
|
||||
Note the percent sign is mandatory. **PERCENTAGE** is treated as an `int`.
|
||||
|
||||
## Metrics
|
||||
|
||||
|
|
27
plugin/cache/cache.go
vendored
27
plugin/cache/cache.go
vendored
|
@ -32,6 +32,27 @@ type Cache struct {
|
|||
prefetch int
|
||||
duration time.Duration
|
||||
percentage int
|
||||
|
||||
// Testing.
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
// New returns an initialized Cache with default settings. It's up to the
|
||||
// caller to set the Next handler.
|
||||
func New() *Cache {
|
||||
return &Cache{
|
||||
Zones: []string{"."},
|
||||
pcap: defaultCap,
|
||||
pcache: cache.New(defaultCap),
|
||||
pttl: maxTTL,
|
||||
ncap: defaultCap,
|
||||
ncache: cache.New(defaultCap),
|
||||
nttl: maxNTTL,
|
||||
prefetch: 0,
|
||||
duration: 1 * time.Minute,
|
||||
percentage: 10,
|
||||
now: time.Now,
|
||||
}
|
||||
}
|
||||
|
||||
// Return key under which we store the item, -1 will be returned if we don't store the
|
||||
|
@ -88,7 +109,7 @@ type ResponseWriter struct {
|
|||
// WriteMsg implements the dns.ResponseWriter interface.
|
||||
func (w *ResponseWriter) WriteMsg(res *dns.Msg) error {
|
||||
do := false
|
||||
mt, opt := response.Typify(res, time.Now().UTC())
|
||||
mt, opt := response.Typify(res, w.now().UTC())
|
||||
if opt != nil {
|
||||
do = opt.Do()
|
||||
}
|
||||
|
@ -140,11 +161,11 @@ func (w *ResponseWriter) set(m *dns.Msg, key int, mt response.Type, duration tim
|
|||
|
||||
switch mt {
|
||||
case response.NoError, response.Delegation:
|
||||
i := newItem(m, duration)
|
||||
i := newItem(m, w.now(), duration)
|
||||
w.pcache.Add(uint32(key), i)
|
||||
|
||||
case response.NameError, response.NoData:
|
||||
i := newItem(m, duration)
|
||||
i := newItem(m, w.now(), duration)
|
||||
w.ncache.Add(uint32(key), i)
|
||||
|
||||
case response.OtherError:
|
||||
|
|
18
plugin/cache/cache_test.go
vendored
18
plugin/cache/cache_test.go
vendored
|
@ -9,7 +9,6 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/coredns/coredns/plugin"
|
||||
"github.com/coredns/coredns/plugin/pkg/cache"
|
||||
"github.com/coredns/coredns/plugin/pkg/response"
|
||||
"github.com/coredns/coredns/plugin/test"
|
||||
|
||||
|
@ -149,9 +148,9 @@ func cacheMsg(m *dns.Msg, tc cacheTestCase) *dns.Msg {
|
|||
}
|
||||
|
||||
func newTestCache(ttl time.Duration) (*Cache, *ResponseWriter) {
|
||||
c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: ttl, nttl: ttl}
|
||||
c.pcache = cache.New(c.pcap)
|
||||
c.ncache = cache.New(c.ncap)
|
||||
c := New()
|
||||
c.pttl = ttl
|
||||
c.nttl = ttl
|
||||
|
||||
crr := &ResponseWriter{ResponseWriter: nil, Cache: c}
|
||||
return c, crr
|
||||
|
@ -187,7 +186,7 @@ func TestCache(t *testing.T) {
|
|||
}
|
||||
|
||||
if ok {
|
||||
resp := i.toMsg(m)
|
||||
resp := i.toMsg(m, time.Now().UTC())
|
||||
|
||||
if !test.Header(t, tc.Case, resp) {
|
||||
t.Logf("%v\n", resp)
|
||||
|
@ -209,9 +208,7 @@ func TestCache(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCacheZeroTTL(t *testing.T) {
|
||||
c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL}
|
||||
c.pcache = cache.New(c.pcap)
|
||||
c.ncache = cache.New(c.ncap)
|
||||
c := New()
|
||||
c.Next = zeroTTLBackend()
|
||||
|
||||
req := new(dns.Msg)
|
||||
|
@ -228,11 +225,8 @@ func TestCacheZeroTTL(t *testing.T) {
|
|||
}
|
||||
|
||||
func BenchmarkCacheResponse(b *testing.B) {
|
||||
c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL}
|
||||
c.pcache = cache.New(c.pcap)
|
||||
c.ncache = cache.New(c.ncap)
|
||||
c := New()
|
||||
c.prefetch = 1
|
||||
c.duration = 1 * time.Second
|
||||
c.Next = BackendHandler()
|
||||
|
||||
ctx := context.TODO()
|
||||
|
|
6
plugin/cache/fuzz.go
vendored
6
plugin/cache/fuzz.go
vendored
|
@ -3,14 +3,10 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/coredns/coredns/plugin/pkg/fuzz"
|
||||
)
|
||||
|
||||
// Fuzz fuzzes cache.
|
||||
func Fuzz(data []byte) int {
|
||||
c := &Cache{pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxNTTL, prefetch: 0, duration: 1 * time.Minute}
|
||||
|
||||
return fuzz.Do(c, data)
|
||||
return fuzz.Do(New(), data)
|
||||
}
|
||||
|
|
39
plugin/cache/handler.go
vendored
39
plugin/cache/handler.go
vendored
|
@ -1,6 +1,7 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -25,11 +26,11 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg)
|
|||
|
||||
do := state.Do() // TODO(): might need more from OPT record? Like the actual bufsize?
|
||||
|
||||
now := time.Now().UTC()
|
||||
now := c.now().UTC()
|
||||
|
||||
i, ttl := c.get(now, qname, qtype, do)
|
||||
if i != nil && ttl > 0 {
|
||||
resp := i.toMsg(r)
|
||||
resp := i.toMsg(r, now)
|
||||
|
||||
state.SizeAndDo(resp)
|
||||
resp, _ = state.Scrub(resp)
|
||||
|
@ -37,25 +38,23 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg)
|
|||
|
||||
if c.prefetch > 0 {
|
||||
i.Freq.Update(c.duration, now)
|
||||
}
|
||||
|
||||
pct := 100
|
||||
if i.origTTL != 0 { // you'll never know
|
||||
pct = int(float64(ttl) / float64(i.origTTL) * 100)
|
||||
}
|
||||
|
||||
if c.prefetch > 0 && i.Freq.Hits() > c.prefetch && pct < c.percentage {
|
||||
threshold := int(math.Ceil(float64(c.percentage) / 100 * float64(i.origTTL)))
|
||||
if i.Freq.Hits() >= c.prefetch && ttl <= threshold {
|
||||
go func() {
|
||||
cachePrefetches.Inc()
|
||||
// When prefetching we loose the item i, and with it the frequency
|
||||
// that we've gathered sofar. See we copy the frequencies info back
|
||||
// into the new item that was stored in the cache.
|
||||
prr := &ResponseWriter{ResponseWriter: w, Cache: c, prefetch: true}
|
||||
plugin.NextOrFailure(c.Name(), c.Next, ctx, prr, r)
|
||||
|
||||
if i1, _ := c.get(now, qname, qtype, do); i1 != nil {
|
||||
if i1 := c.exists(qname, qtype, do); i1 != nil {
|
||||
i1.Freq.Reset(now, i.Freq.Hits())
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
return dns.RcodeSuccess, nil
|
||||
}
|
||||
|
||||
|
@ -82,6 +81,17 @@ func (c *Cache) get(now time.Time, qname string, qtype uint16, do bool) (*item,
|
|||
return nil, 0
|
||||
}
|
||||
|
||||
func (c *Cache) exists(qname string, qtype uint16, do bool) *item {
|
||||
k := hash(qname, qtype, do)
|
||||
if i, ok := c.ncache.Get(k); ok {
|
||||
return i.(*item)
|
||||
}
|
||||
if i, ok := c.pcache.Get(k); ok {
|
||||
return i.(*item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
cacheSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: plugin.Namespace,
|
||||
|
@ -110,6 +120,13 @@ var (
|
|||
Name: "misses_total",
|
||||
Help: "The count of cache misses.",
|
||||
})
|
||||
|
||||
cachePrefetches = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: plugin.Namespace,
|
||||
Subsystem: "cache",
|
||||
Name: "prefetch_total",
|
||||
Help: "The number of time the cache has prefetched a cached item.",
|
||||
})
|
||||
)
|
||||
|
||||
var once sync.Once
|
||||
|
|
8
plugin/cache/item.go
vendored
8
plugin/cache/item.go
vendored
|
@ -23,7 +23,7 @@ type item struct {
|
|||
*freq.Freq
|
||||
}
|
||||
|
||||
func newItem(m *dns.Msg, d time.Duration) *item {
|
||||
func newItem(m *dns.Msg, now time.Time, d time.Duration) *item {
|
||||
i := new(item)
|
||||
i.Rcode = m.Rcode
|
||||
i.Authoritative = m.Authoritative
|
||||
|
@ -44,7 +44,7 @@ func newItem(m *dns.Msg, d time.Duration) *item {
|
|||
i.Extra = i.Extra[:j]
|
||||
|
||||
i.origTTL = uint32(d.Seconds())
|
||||
i.stored = time.Now().UTC()
|
||||
i.stored = now.UTC()
|
||||
|
||||
i.Freq = new(freq.Freq)
|
||||
|
||||
|
@ -53,7 +53,7 @@ func newItem(m *dns.Msg, d time.Duration) *item {
|
|||
|
||||
// toMsg turns i into a message, it tailors the reply to m.
|
||||
// The Authoritative bit is always set to 0, because the answer is from the cache.
|
||||
func (i *item) toMsg(m *dns.Msg) *dns.Msg {
|
||||
func (i *item) toMsg(m *dns.Msg, now time.Time) *dns.Msg {
|
||||
m1 := new(dns.Msg)
|
||||
m1.SetReply(m)
|
||||
|
||||
|
@ -67,7 +67,7 @@ func (i *item) toMsg(m *dns.Msg) *dns.Msg {
|
|||
m1.Ns = make([]dns.RR, len(i.Ns))
|
||||
m1.Extra = make([]dns.RR, len(i.Extra))
|
||||
|
||||
ttl := uint32(i.ttl(time.Now()))
|
||||
ttl := uint32(i.ttl(now))
|
||||
for j, r := range i.Answer {
|
||||
m1.Answer[j] = dns.Copy(r)
|
||||
m1.Answer[j].Header().Ttl = ttl
|
||||
|
|
175
plugin/cache/prefech_test.go
vendored
175
plugin/cache/prefech_test.go
vendored
|
@ -6,7 +6,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/coredns/coredns/plugin"
|
||||
"github.com/coredns/coredns/plugin/pkg/cache"
|
||||
"github.com/coredns/coredns/plugin/pkg/dnstest"
|
||||
|
||||
"github.com/coredns/coredns/plugin/test"
|
||||
|
@ -14,41 +13,151 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var p = false
|
||||
|
||||
func TestPrefetch(t *testing.T) {
|
||||
c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL}
|
||||
c.pcache = cache.New(c.pcap)
|
||||
c.ncache = cache.New(c.ncap)
|
||||
c.prefetch = 1
|
||||
c.duration = 1 * time.Second
|
||||
c.Next = PrefetchHandler(t, dns.RcodeSuccess, nil)
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
req := new(dns.Msg)
|
||||
req.SetQuestion("lowttl.example.org.", dns.TypeA)
|
||||
|
||||
rec := dnstest.NewRecorder(&test.ResponseWriter{})
|
||||
|
||||
c.ServeDNS(ctx, rec, req)
|
||||
p = true // prefetch should be true for the 2nd fetch
|
||||
c.ServeDNS(ctx, rec, req)
|
||||
}
|
||||
|
||||
func PrefetchHandler(t *testing.T, rcode int, err error) plugin.Handler {
|
||||
return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion("lowttl.example.org.", dns.TypeA)
|
||||
m.Response = true
|
||||
m.RecursionAvailable = true
|
||||
m.Answer = append(m.Answer, test.A("lowttl.example.org. 80 IN A 127.0.0.53"))
|
||||
if p != w.(*ResponseWriter).prefetch {
|
||||
err = fmt.Errorf("cache prefetch not equal to p: got %t, want %t", p, w.(*ResponseWriter).prefetch)
|
||||
t.Fatal(err)
|
||||
tests := []struct {
|
||||
qname string
|
||||
ttl int
|
||||
prefetch int
|
||||
verifications []verification
|
||||
}{
|
||||
{
|
||||
qname: "hits.reset.example.org.",
|
||||
ttl: 80,
|
||||
prefetch: 1,
|
||||
verifications: []verification{
|
||||
{
|
||||
after: 0 * time.Second,
|
||||
answer: "hits.reset.example.org. 80 IN A 127.0.0.1",
|
||||
fetch: true,
|
||||
},
|
||||
{
|
||||
after: 73 * time.Second,
|
||||
answer: "hits.reset.example.org. 7 IN A 127.0.0.1",
|
||||
fetch: true,
|
||||
},
|
||||
{
|
||||
after: 80 * time.Second,
|
||||
answer: "hits.reset.example.org. 73 IN A 127.0.0.2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "short.ttl.example.org.",
|
||||
ttl: 5,
|
||||
prefetch: 1,
|
||||
verifications: []verification{
|
||||
{
|
||||
after: 0 * time.Second,
|
||||
answer: "short.ttl.example.org. 5 IN A 127.0.0.1",
|
||||
fetch: true,
|
||||
},
|
||||
{
|
||||
after: 1 * time.Second,
|
||||
answer: "short.ttl.example.org. 4 IN A 127.0.0.1",
|
||||
},
|
||||
{
|
||||
after: 4 * time.Second,
|
||||
answer: "short.ttl.example.org. 1 IN A 127.0.0.1",
|
||||
fetch: true,
|
||||
},
|
||||
{
|
||||
after: 5 * time.Second,
|
||||
answer: "short.ttl.example.org. 4 IN A 127.0.0.2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "no.prefetch.example.org.",
|
||||
ttl: 30,
|
||||
prefetch: 0,
|
||||
verifications: []verification{
|
||||
{
|
||||
after: 0 * time.Second,
|
||||
answer: "no.prefetch.example.org. 30 IN A 127.0.0.1",
|
||||
fetch: true,
|
||||
},
|
||||
{
|
||||
after: 15 * time.Second,
|
||||
answer: "no.prefetch.example.org. 15 IN A 127.0.0.1",
|
||||
},
|
||||
{
|
||||
after: 29 * time.Second,
|
||||
answer: "no.prefetch.example.org. 1 IN A 127.0.0.1",
|
||||
},
|
||||
{
|
||||
after: 30 * time.Second,
|
||||
answer: "no.prefetch.example.org. 30 IN A 127.0.0.2",
|
||||
fetch: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
t0, err := time.Parse(time.RFC3339, "2018-01-01T14:00:00+00:00")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.qname, func(t *testing.T) {
|
||||
fetchc := make(chan struct{}, 1)
|
||||
|
||||
c := New()
|
||||
c.prefetch = tt.prefetch
|
||||
c.Next = prefetchHandler(tt.qname, tt.ttl, fetchc)
|
||||
|
||||
req := new(dns.Msg)
|
||||
req.SetQuestion(tt.qname, dns.TypeA)
|
||||
rec := dnstest.NewRecorder(&test.ResponseWriter{})
|
||||
|
||||
for _, v := range tt.verifications {
|
||||
c.now = func() time.Time { return t0.Add(v.after) }
|
||||
|
||||
c.ServeDNS(context.TODO(), rec, req)
|
||||
if v.fetch {
|
||||
select {
|
||||
case <-fetchc:
|
||||
if !v.fetch {
|
||||
t.Fatalf("after %s: want request to trigger a prefetch", v.after)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("after %s: want request to trigger a prefetch", v.after)
|
||||
}
|
||||
}
|
||||
if want, got := rec.Rcode, dns.RcodeSuccess; want != got {
|
||||
t.Errorf("after %s: want rcode %d, got %d", v.after, want, got)
|
||||
}
|
||||
if want, got := 1, len(rec.Msg.Answer); want != got {
|
||||
t.Errorf("after %s: want %d answer RR, got %d", v.after, want, got)
|
||||
}
|
||||
if want, got := test.A(v.answer).String(), rec.Msg.Answer[0].String(); want != got {
|
||||
t.Errorf("after %s: want answer %s, got %s", v.after, want, got)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type verification struct {
|
||||
after time.Duration
|
||||
answer string
|
||||
// fetch defines whether a request is sent to the next handler.
|
||||
fetch bool
|
||||
}
|
||||
|
||||
// prefetchHandler is a fake plugin implementation which returns a single A
|
||||
// record with the given qname and ttl. The returned IP address starts at
|
||||
// 127.0.0.1 and is incremented on every request.
|
||||
func prefetchHandler(qname string, ttl int, fetchc chan struct{}) plugin.Handler {
|
||||
i := 0
|
||||
return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
|
||||
i++
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(qname, dns.TypeA)
|
||||
m.Response = true
|
||||
m.Answer = append(m.Answer, test.A(fmt.Sprintf("%s %d IN A 127.0.0.%d", qname, ttl, i)))
|
||||
|
||||
w.WriteMsg(m)
|
||||
return rcode, err
|
||||
fetchc <- struct{}{}
|
||||
return dns.RcodeSuccess, nil
|
||||
})
|
||||
}
|
||||
|
|
13
plugin/cache/setup.go
vendored
13
plugin/cache/setup.go
vendored
|
@ -41,21 +41,25 @@ func setup(c *caddy.Controller) error {
|
|||
x.MustRegister(cacheCapacity)
|
||||
x.MustRegister(cacheHits)
|
||||
x.MustRegister(cacheMisses)
|
||||
x.MustRegister(cachePrefetches)
|
||||
}
|
||||
})
|
||||
return nil
|
||||
})
|
||||
|
||||
// Export the capacity for the metrics. This only happens once, because this is a re-load change only.
|
||||
// Initialize all counters and gauges.
|
||||
cacheSize.WithLabelValues(Success)
|
||||
cacheSize.WithLabelValues(Denial)
|
||||
cacheCapacity.WithLabelValues(Success).Set(float64(ca.pcap))
|
||||
cacheCapacity.WithLabelValues(Denial).Set(float64(ca.ncap))
|
||||
cacheHits.WithLabelValues(Success)
|
||||
cacheHits.WithLabelValues(Denial)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func cacheParse(c *caddy.Controller) (*Cache, error) {
|
||||
|
||||
ca := &Cache{pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxNTTL, prefetch: 0, duration: 1 * time.Minute}
|
||||
ca := New()
|
||||
|
||||
for c.Next() {
|
||||
// cache [ttl] [zones..]
|
||||
|
@ -140,8 +144,6 @@ func cacheParse(c *caddy.Controller) (*Cache, error) {
|
|||
}
|
||||
ca.prefetch = amount
|
||||
|
||||
ca.duration = 1 * time.Minute
|
||||
ca.percentage = 10
|
||||
if len(args) > 1 {
|
||||
dur, err := time.ParseDuration(args[1])
|
||||
if err != nil {
|
||||
|
@ -174,7 +176,6 @@ func cacheParse(c *caddy.Controller) (*Cache, error) {
|
|||
for i := range origins {
|
||||
origins[i] = plugin.Host(origins[i]).Normalize()
|
||||
}
|
||||
|
||||
ca.Zones = origins
|
||||
|
||||
ca.pcache = cache.New(ca.pcap)
|
||||
|
|
Loading…
Add table
Reference in a new issue