diff --git a/plugin/cache/README.md b/plugin/cache/README.md index c7f98812f..fe9e45c24 100644 --- a/plugin/cache/README.md +++ b/plugin/cache/README.md @@ -44,8 +44,8 @@ cache [TTL] [ZONES...] { * `prefetch` will prefetch popular items when they are about to be expunged from the cache. Popular means **AMOUNT** queries have been seen with no gaps of **DURATION** or more between them. **DURATION** defaults to 1m. Prefetching will happen when the TTL drops below **PERCENTAGE**, - which defaults to `10%`. Values should be in the range `[10%, 90%]`. Note the percent sign is - mandatory. **PERCENTAGE** is treated as an `int`. + which defaults to `10%`, or latest 1 second before TTL expiration. Values should be in the range `[10%, 90%]`. + Note the percent sign is mandatory. **PERCENTAGE** is treated as an `int`. ## Metrics diff --git a/plugin/cache/cache.go b/plugin/cache/cache.go index caa38d1f0..cb7a10140 100644 --- a/plugin/cache/cache.go +++ b/plugin/cache/cache.go @@ -32,6 +32,27 @@ type Cache struct { prefetch int duration time.Duration percentage int + + // Testing. + now func() time.Time +} + +// New returns an initialized Cache with default settings. It's up to the +// caller to set the Next handler. +func New() *Cache { + return &Cache{ + Zones: []string{"."}, + pcap: defaultCap, + pcache: cache.New(defaultCap), + pttl: maxTTL, + ncap: defaultCap, + ncache: cache.New(defaultCap), + nttl: maxNTTL, + prefetch: 0, + duration: 1 * time.Minute, + percentage: 10, + now: time.Now, + } } // Return key under which we store the item, -1 will be returned if we don't store the @@ -88,7 +109,7 @@ type ResponseWriter struct { // WriteMsg implements the dns.ResponseWriter interface. func (w *ResponseWriter) WriteMsg(res *dns.Msg) error { do := false - mt, opt := response.Typify(res, time.Now().UTC()) + mt, opt := response.Typify(res, w.now().UTC()) if opt != nil { do = opt.Do() } @@ -140,11 +161,11 @@ func (w *ResponseWriter) set(m *dns.Msg, key int, mt response.Type, duration tim switch mt { case response.NoError, response.Delegation: - i := newItem(m, duration) + i := newItem(m, w.now(), duration) w.pcache.Add(uint32(key), i) case response.NameError, response.NoData: - i := newItem(m, duration) + i := newItem(m, w.now(), duration) w.ncache.Add(uint32(key), i) case response.OtherError: diff --git a/plugin/cache/cache_test.go b/plugin/cache/cache_test.go index 7b7c2c6f2..b475f3473 100644 --- a/plugin/cache/cache_test.go +++ b/plugin/cache/cache_test.go @@ -9,7 +9,6 @@ import ( "golang.org/x/net/context" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/pkg/cache" "github.com/coredns/coredns/plugin/pkg/response" "github.com/coredns/coredns/plugin/test" @@ -149,9 +148,9 @@ func cacheMsg(m *dns.Msg, tc cacheTestCase) *dns.Msg { } func newTestCache(ttl time.Duration) (*Cache, *ResponseWriter) { - c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: ttl, nttl: ttl} - c.pcache = cache.New(c.pcap) - c.ncache = cache.New(c.ncap) + c := New() + c.pttl = ttl + c.nttl = ttl crr := &ResponseWriter{ResponseWriter: nil, Cache: c} return c, crr @@ -187,7 +186,7 @@ func TestCache(t *testing.T) { } if ok { - resp := i.toMsg(m) + resp := i.toMsg(m, time.Now().UTC()) if !test.Header(t, tc.Case, resp) { t.Logf("%v\n", resp) @@ -209,9 +208,7 @@ func TestCache(t *testing.T) { } func TestCacheZeroTTL(t *testing.T) { - c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL} - c.pcache = cache.New(c.pcap) - c.ncache = cache.New(c.ncap) + c := New() c.Next = zeroTTLBackend() req := new(dns.Msg) @@ -228,11 +225,8 @@ func TestCacheZeroTTL(t *testing.T) { } func BenchmarkCacheResponse(b *testing.B) { - c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL} - c.pcache = cache.New(c.pcap) - c.ncache = cache.New(c.ncap) + c := New() c.prefetch = 1 - c.duration = 1 * time.Second c.Next = BackendHandler() ctx := context.TODO() diff --git a/plugin/cache/fuzz.go b/plugin/cache/fuzz.go index c2ca7554e..9bf6cb3a9 100644 --- a/plugin/cache/fuzz.go +++ b/plugin/cache/fuzz.go @@ -3,14 +3,10 @@ package cache import ( - "time" - "github.com/coredns/coredns/plugin/pkg/fuzz" ) // Fuzz fuzzes cache. func Fuzz(data []byte) int { - c := &Cache{pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxNTTL, prefetch: 0, duration: 1 * time.Minute} - - return fuzz.Do(c, data) + return fuzz.Do(New(), data) } diff --git a/plugin/cache/handler.go b/plugin/cache/handler.go index df2c74e39..e579aaffc 100644 --- a/plugin/cache/handler.go +++ b/plugin/cache/handler.go @@ -1,6 +1,7 @@ package cache import ( + "math" "sync" "time" @@ -25,11 +26,11 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) do := state.Do() // TODO(): might need more from OPT record? Like the actual bufsize? - now := time.Now().UTC() + now := c.now().UTC() i, ttl := c.get(now, qname, qtype, do) if i != nil && ttl > 0 { - resp := i.toMsg(r) + resp := i.toMsg(r, now) state.SizeAndDo(resp) resp, _ = state.Scrub(resp) @@ -37,25 +38,23 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) if c.prefetch > 0 { i.Freq.Update(c.duration, now) - } - pct := 100 - if i.origTTL != 0 { // you'll never know - pct = int(float64(ttl) / float64(i.origTTL) * 100) - } + threshold := int(math.Ceil(float64(c.percentage) / 100 * float64(i.origTTL))) + if i.Freq.Hits() >= c.prefetch && ttl <= threshold { + go func() { + cachePrefetches.Inc() + // When prefetching we loose the item i, and with it the frequency + // that we've gathered sofar. See we copy the frequencies info back + // into the new item that was stored in the cache. + prr := &ResponseWriter{ResponseWriter: w, Cache: c, prefetch: true} + plugin.NextOrFailure(c.Name(), c.Next, ctx, prr, r) - if c.prefetch > 0 && i.Freq.Hits() > c.prefetch && pct < c.percentage { - // When prefetching we loose the item i, and with it the frequency - // that we've gathered sofar. See we copy the frequencies info back - // into the new item that was stored in the cache. - prr := &ResponseWriter{ResponseWriter: w, Cache: c, prefetch: true} - plugin.NextOrFailure(c.Name(), c.Next, ctx, prr, r) - - if i1, _ := c.get(now, qname, qtype, do); i1 != nil { - i1.Freq.Reset(now, i.Freq.Hits()) + if i1 := c.exists(qname, qtype, do); i1 != nil { + i1.Freq.Reset(now, i.Freq.Hits()) + } + }() } } - return dns.RcodeSuccess, nil } @@ -82,6 +81,17 @@ func (c *Cache) get(now time.Time, qname string, qtype uint16, do bool) (*item, return nil, 0 } +func (c *Cache) exists(qname string, qtype uint16, do bool) *item { + k := hash(qname, qtype, do) + if i, ok := c.ncache.Get(k); ok { + return i.(*item) + } + if i, ok := c.pcache.Get(k); ok { + return i.(*item) + } + return nil +} + var ( cacheSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: plugin.Namespace, @@ -110,6 +120,13 @@ var ( Name: "misses_total", Help: "The count of cache misses.", }) + + cachePrefetches = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: plugin.Namespace, + Subsystem: "cache", + Name: "prefetch_total", + Help: "The number of time the cache has prefetched a cached item.", + }) ) var once sync.Once diff --git a/plugin/cache/item.go b/plugin/cache/item.go index d67906c81..3943ff4ae 100644 --- a/plugin/cache/item.go +++ b/plugin/cache/item.go @@ -23,7 +23,7 @@ type item struct { *freq.Freq } -func newItem(m *dns.Msg, d time.Duration) *item { +func newItem(m *dns.Msg, now time.Time, d time.Duration) *item { i := new(item) i.Rcode = m.Rcode i.Authoritative = m.Authoritative @@ -44,7 +44,7 @@ func newItem(m *dns.Msg, d time.Duration) *item { i.Extra = i.Extra[:j] i.origTTL = uint32(d.Seconds()) - i.stored = time.Now().UTC() + i.stored = now.UTC() i.Freq = new(freq.Freq) @@ -53,7 +53,7 @@ func newItem(m *dns.Msg, d time.Duration) *item { // toMsg turns i into a message, it tailors the reply to m. // The Authoritative bit is always set to 0, because the answer is from the cache. -func (i *item) toMsg(m *dns.Msg) *dns.Msg { +func (i *item) toMsg(m *dns.Msg, now time.Time) *dns.Msg { m1 := new(dns.Msg) m1.SetReply(m) @@ -67,7 +67,7 @@ func (i *item) toMsg(m *dns.Msg) *dns.Msg { m1.Ns = make([]dns.RR, len(i.Ns)) m1.Extra = make([]dns.RR, len(i.Extra)) - ttl := uint32(i.ttl(time.Now())) + ttl := uint32(i.ttl(now)) for j, r := range i.Answer { m1.Answer[j] = dns.Copy(r) m1.Answer[j].Header().Ttl = ttl diff --git a/plugin/cache/prefech_test.go b/plugin/cache/prefech_test.go index 77a8f45ea..1cd0758fb 100644 --- a/plugin/cache/prefech_test.go +++ b/plugin/cache/prefech_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/pkg/cache" "github.com/coredns/coredns/plugin/pkg/dnstest" "github.com/coredns/coredns/plugin/test" @@ -14,41 +13,151 @@ import ( "golang.org/x/net/context" ) -var p = false - func TestPrefetch(t *testing.T) { - c := &Cache{Zones: []string{"."}, pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxTTL} - c.pcache = cache.New(c.pcap) - c.ncache = cache.New(c.ncap) - c.prefetch = 1 - c.duration = 1 * time.Second - c.Next = PrefetchHandler(t, dns.RcodeSuccess, nil) + tests := []struct { + qname string + ttl int + prefetch int + verifications []verification + }{ + { + qname: "hits.reset.example.org.", + ttl: 80, + prefetch: 1, + verifications: []verification{ + { + after: 0 * time.Second, + answer: "hits.reset.example.org. 80 IN A 127.0.0.1", + fetch: true, + }, + { + after: 73 * time.Second, + answer: "hits.reset.example.org. 7 IN A 127.0.0.1", + fetch: true, + }, + { + after: 80 * time.Second, + answer: "hits.reset.example.org. 73 IN A 127.0.0.2", + }, + }, + }, + { + qname: "short.ttl.example.org.", + ttl: 5, + prefetch: 1, + verifications: []verification{ + { + after: 0 * time.Second, + answer: "short.ttl.example.org. 5 IN A 127.0.0.1", + fetch: true, + }, + { + after: 1 * time.Second, + answer: "short.ttl.example.org. 4 IN A 127.0.0.1", + }, + { + after: 4 * time.Second, + answer: "short.ttl.example.org. 1 IN A 127.0.0.1", + fetch: true, + }, + { + after: 5 * time.Second, + answer: "short.ttl.example.org. 4 IN A 127.0.0.2", + }, + }, + }, + { + qname: "no.prefetch.example.org.", + ttl: 30, + prefetch: 0, + verifications: []verification{ + { + after: 0 * time.Second, + answer: "no.prefetch.example.org. 30 IN A 127.0.0.1", + fetch: true, + }, + { + after: 15 * time.Second, + answer: "no.prefetch.example.org. 15 IN A 127.0.0.1", + }, + { + after: 29 * time.Second, + answer: "no.prefetch.example.org. 1 IN A 127.0.0.1", + }, + { + after: 30 * time.Second, + answer: "no.prefetch.example.org. 30 IN A 127.0.0.2", + fetch: true, + }, + }, + }, + } - ctx := context.TODO() + t0, err := time.Parse(time.RFC3339, "2018-01-01T14:00:00+00:00") + if err != nil { + t.Fatal(err) + } + for _, tt := range tests { + t.Run(tt.qname, func(t *testing.T) { + fetchc := make(chan struct{}, 1) - req := new(dns.Msg) - req.SetQuestion("lowttl.example.org.", dns.TypeA) + c := New() + c.prefetch = tt.prefetch + c.Next = prefetchHandler(tt.qname, tt.ttl, fetchc) - rec := dnstest.NewRecorder(&test.ResponseWriter{}) + req := new(dns.Msg) + req.SetQuestion(tt.qname, dns.TypeA) + rec := dnstest.NewRecorder(&test.ResponseWriter{}) - c.ServeDNS(ctx, rec, req) - p = true // prefetch should be true for the 2nd fetch - c.ServeDNS(ctx, rec, req) + for _, v := range tt.verifications { + c.now = func() time.Time { return t0.Add(v.after) } + + c.ServeDNS(context.TODO(), rec, req) + if v.fetch { + select { + case <-fetchc: + if !v.fetch { + t.Fatalf("after %s: want request to trigger a prefetch", v.after) + } + case <-time.After(time.Second): + t.Fatalf("after %s: want request to trigger a prefetch", v.after) + } + } + if want, got := rec.Rcode, dns.RcodeSuccess; want != got { + t.Errorf("after %s: want rcode %d, got %d", v.after, want, got) + } + if want, got := 1, len(rec.Msg.Answer); want != got { + t.Errorf("after %s: want %d answer RR, got %d", v.after, want, got) + } + if want, got := test.A(v.answer).String(), rec.Msg.Answer[0].String(); want != got { + t.Errorf("after %s: want answer %s, got %s", v.after, want, got) + } + } + }) + } } -func PrefetchHandler(t *testing.T, rcode int, err error) plugin.Handler { +type verification struct { + after time.Duration + answer string + // fetch defines whether a request is sent to the next handler. + fetch bool +} + +// prefetchHandler is a fake plugin implementation which returns a single A +// record with the given qname and ttl. The returned IP address starts at +// 127.0.0.1 and is incremented on every request. +func prefetchHandler(qname string, ttl int, fetchc chan struct{}) plugin.Handler { + i := 0 return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + i++ m := new(dns.Msg) - m.SetQuestion("lowttl.example.org.", dns.TypeA) + m.SetQuestion(qname, dns.TypeA) m.Response = true - m.RecursionAvailable = true - m.Answer = append(m.Answer, test.A("lowttl.example.org. 80 IN A 127.0.0.53")) - if p != w.(*ResponseWriter).prefetch { - err = fmt.Errorf("cache prefetch not equal to p: got %t, want %t", p, w.(*ResponseWriter).prefetch) - t.Fatal(err) - } + m.Answer = append(m.Answer, test.A(fmt.Sprintf("%s %d IN A 127.0.0.%d", qname, ttl, i))) w.WriteMsg(m) - return rcode, err + fetchc <- struct{}{} + return dns.RcodeSuccess, nil }) } diff --git a/plugin/cache/setup.go b/plugin/cache/setup.go index d4d041ae8..41fa023b8 100644 --- a/plugin/cache/setup.go +++ b/plugin/cache/setup.go @@ -41,21 +41,25 @@ func setup(c *caddy.Controller) error { x.MustRegister(cacheCapacity) x.MustRegister(cacheHits) x.MustRegister(cacheMisses) + x.MustRegister(cachePrefetches) } }) return nil }) - // Export the capacity for the metrics. This only happens once, because this is a re-load change only. + // Initialize all counters and gauges. + cacheSize.WithLabelValues(Success) + cacheSize.WithLabelValues(Denial) cacheCapacity.WithLabelValues(Success).Set(float64(ca.pcap)) cacheCapacity.WithLabelValues(Denial).Set(float64(ca.ncap)) + cacheHits.WithLabelValues(Success) + cacheHits.WithLabelValues(Denial) return nil } func cacheParse(c *caddy.Controller) (*Cache, error) { - - ca := &Cache{pcap: defaultCap, ncap: defaultCap, pttl: maxTTL, nttl: maxNTTL, prefetch: 0, duration: 1 * time.Minute} + ca := New() for c.Next() { // cache [ttl] [zones..] @@ -140,8 +144,6 @@ func cacheParse(c *caddy.Controller) (*Cache, error) { } ca.prefetch = amount - ca.duration = 1 * time.Minute - ca.percentage = 10 if len(args) > 1 { dur, err := time.ParseDuration(args[1]) if err != nil { @@ -174,7 +176,6 @@ func cacheParse(c *caddy.Controller) (*Cache, error) { for i := range origins { origins[i] = plugin.Host(origins[i]).Normalize() } - ca.Zones = origins ca.pcache = cache.New(ca.pcap)