diff --git a/plugin/errors/README.md b/plugin/errors/README.md index f15b271f6..70ec3a2c3 100644 --- a/plugin/errors/README.md +++ b/plugin/errors/README.md @@ -6,16 +6,36 @@ ## Description -Any errors encountered during the query processing will be printed to standard output. +Any errors encountered during the query processing will be printed to standard output. The errors of particular type can be consolidated and printed once per some period of time. This plugin can only be used once per Server Block. ## Syntax +The basic syntax is: + ~~~ errors ~~~ +Extra knobs are available with an expanded syntax: + +~~~ +errors { + consolidate DURATION REGEXP +} +~~~ + +Option `consolidate` allows collecting several error messages matching the regular expression **REGEXP** during **DURATION**. After the **DURATION** since receiving the first such message, the consolidated message will be printed to standard output, e.g. + +~~~ +2 errors like '^read udp .* i/o timeout$' occurred in last 30s +~~~ + +Multiple `consolidate` options with different **DURATION** and **REGEXP** are allowed. In case if some error message corresponds to several defined regular expressions the message will be associated with the first appropriate **REGEXP**. + +For better performance, it's recomended to use the `^` or `$` metacharacters in regular expression when filtering error messages by prefix or suffix, e.g. `^failed to .*`, or `.* timeout$`. + ## Examples Use the *whoami* to respond to queries and Log errors to standard output. @@ -26,3 +46,15 @@ Use the *whoami* to respond to queries and Log errors to standard output. errors } ~~~ + +Use the *forward* to resolve queries via 8.8.8.8 and print consolidated error messages for errors with suffix " i/o timeout" or with prefix "Failed to ". + +~~~ corefile +. { + forward . 8.8.8.8 + errors { + consolidate 5m ".* i/o timeout$" + consolidate 30s "^Failed to .+" + } +} +~~~ diff --git a/plugin/errors/errors.go b/plugin/errors/errors.go index 88027b191..aa4b384b8 100644 --- a/plugin/errors/errors.go +++ b/plugin/errors/errors.go @@ -1,8 +1,12 @@ -// Package errors implements an HTTP error handling plugin. +// Package errors implements an error handling plugin. package errors import ( "context" + "regexp" + "sync/atomic" + "time" + "unsafe" "github.com/coredns/coredns/plugin" clog "github.com/coredns/coredns/plugin/pkg/log" @@ -11,19 +15,98 @@ import ( "github.com/miekg/dns" ) +var log = clog.NewWithPlugin("errors") + +type pattern struct { + ptimer unsafe.Pointer + count uint32 + period time.Duration + pattern *regexp.Regexp +} + +func (p *pattern) timer() *time.Timer { + return (*time.Timer)(atomic.LoadPointer(&p.ptimer)) +} + +func (p *pattern) setTimer(t *time.Timer) { + atomic.StorePointer(&p.ptimer, unsafe.Pointer(t)) +} + // errorHandler handles DNS errors (and errors from other plugin). -type errorHandler struct{ Next plugin.Handler } +type errorHandler struct { + patterns []*pattern + eLogger func(int, string, string, string) + cLogger func(uint32, string, time.Duration) + stopFlag uint32 + Next plugin.Handler +} + +func newErrorHandler() *errorHandler { + return &errorHandler{eLogger: errorLogger, cLogger: consLogger} +} + +func errorLogger(code int, qName, qType, err string) { + log.Errorf("%d %s %s: %s", code, qName, qType, err) +} + +func consLogger(cnt uint32, pattern string, p time.Duration) { + log.Errorf("%d errors like '%s' occured in last %s", cnt, pattern, p) +} + +func (h *errorHandler) logPattern(i int) { + cnt := atomic.SwapUint32(&h.patterns[i].count, 0) + if cnt > 0 { + h.cLogger(cnt, h.patterns[i].pattern.String(), h.patterns[i].period) + } +} + +func (h *errorHandler) inc(i int) bool { + if atomic.LoadUint32(&h.stopFlag) > 0 { + return false + } + if atomic.AddUint32(&h.patterns[i].count, 1) == 1 { + ind := i + t := time.AfterFunc(h.patterns[ind].period, func() { + h.logPattern(ind) + }) + h.patterns[ind].setTimer(t) + if atomic.LoadUint32(&h.stopFlag) > 0 && t.Stop() { + h.logPattern(ind) + } + } + return true +} + +func (h *errorHandler) stop() { + atomic.StoreUint32(&h.stopFlag, 1) + for i := range h.patterns { + t := h.patterns[i].timer() + if t != nil && t.Stop() { + h.logPattern(i) + } + } +} // ServeDNS implements the plugin.Handler interface. -func (h errorHandler) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { +func (h *errorHandler) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { rcode, err := plugin.NextOrFailure(h.Name(), h.Next, ctx, w, r) if err != nil { + strErr := err.Error() + for i := range h.patterns { + if h.patterns[i].pattern.MatchString(strErr) { + if h.inc(i) { + return rcode, err + } + break + } + } state := request.Request{W: w, Req: r} - clog.Errorf("%d %s %s: %v", rcode, state.Name(), state.Type(), err) + h.eLogger(rcode, state.Name(), state.Type(), strErr) } return rcode, err } -func (h errorHandler) Name() string { return "errors" } +// Name implements the plugin.Handler interface. +func (h *errorHandler) Name() string { return "errors" } diff --git a/plugin/errors/errors_test.go b/plugin/errors/errors_test.go index afe809c71..9265d3fc2 100644 --- a/plugin/errors/errors_test.go +++ b/plugin/errors/errors_test.go @@ -5,9 +5,14 @@ import ( "context" "errors" "fmt" - "log" + golog "log" + "math/rand" + "regexp" "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/pkg/dnstest" @@ -18,8 +23,8 @@ import ( func TestErrors(t *testing.T) { buf := bytes.Buffer{} - log.SetOutput(&buf) - em := errorHandler{} + golog.SetOutput(&buf) + em := errorHandler{eLogger: errorLogger} testErr := errors.New("test error") tests := []struct { @@ -37,7 +42,7 @@ func TestErrors(t *testing.T) { { next: genErrorHandler(dns.RcodeNotAuth, testErr), expectedCode: dns.RcodeNotAuth, - expectedLog: fmt.Sprintf("[ERROR] %d %s: %v\n", dns.RcodeNotAuth, "example.org. A", testErr), + expectedLog: fmt.Sprintf("%d %s: %v\n", dns.RcodeNotAuth, "example.org. A", testErr), expectedErr: testErr, }, } @@ -67,6 +72,374 @@ func TestErrors(t *testing.T) { } } +func TestConsLogger(t *testing.T) { + buf := bytes.Buffer{} + golog.SetOutput(&buf) + + consLogger(5, "^Error.*!$", 3*time.Second) + + exp := "[ERROR] plugin/errors: 5 errors like '^Error.*!$' occured in last 3s" + act := buf.String() + if !strings.Contains(act, exp) { + t.Errorf("Unexpected log message, expected to contain %q, actual %q", exp, act) + } +} + +func TestLogPattern(t *testing.T) { + h := &errorHandler{ + patterns: []*pattern{{ + count: 4, + period: 2 * time.Second, + pattern: regexp.MustCompile("^Error.*!$"), + }}, + cLogger: testConsLogger, + } + loggedData = loggedData[:0] + + h.logPattern(0) + expLen := 1 + if len(loggedData) != expLen { + t.Errorf("Unexpected number of logged messages, expected %d, actual %d", + expLen, len(loggedData)) + } + expCnt := uint32(4) + if loggedData[0].cnt != expCnt { + t.Errorf("Unexpected 'count' in logged message, expected %d, actual %d", + expCnt, loggedData[0].cnt) + } + expPat := "^Error.*!$" + actPat := loggedData[0].pat + if actPat != expPat { + t.Errorf("Unexpected 'pattern' in logged message, expected %s, actual %s", + expPat, actPat) + } + expPer := "2s" + actPer := loggedData[0].dur.String() + if actPer != expPer { + t.Errorf("Unexpected 'period' in logged message, expected %s, actual %s", + expPer, actPer) + } + + h.logPattern(0) + if len(loggedData) != expLen { + t.Errorf("Unexpected number of logged messages, expected %d, actual %d", + expLen, len(loggedData)) + } +} + +func TestInc(t *testing.T) { + h := &errorHandler{ + stopFlag: 1, + patterns: []*pattern{{ + period: 2 * time.Second, + pattern: regexp.MustCompile("^Error.*!$"), + }}, + cLogger: testConsLogger, + } + + ret := h.inc(0) + if ret { + t.Error("Unexpected return value, expected false, actual true") + } + + h.stopFlag = 0 + ret = h.inc(0) + if !ret { + t.Error("Unexpected return value, expected true, actual false") + } + + expCnt := uint32(1) + actCnt := atomic.LoadUint32(&h.patterns[0].count) + if actCnt != expCnt { + t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt) + } + + t1 := h.patterns[0].timer() + if t1 == nil { + t.Error("Unexpected 'timer', expected not nil") + } + + ret = h.inc(0) + if !ret { + t.Error("Unexpected return value, expected true, actual false") + } + + expCnt = uint32(2) + actCnt = atomic.LoadUint32(&h.patterns[0].count) + if actCnt != expCnt { + t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt) + } + + t2 := h.patterns[0].timer() + if t2 != t1 { + t.Error("Unexpected 'timer', expected the same") + } + + ret = t1.Stop() + if !ret { + t.Error("Timer was unexpectedly stopped before") + } + ret = t2.Stop() + if ret { + t.Error("Timer was unexpectedly not stopped before") + } +} + +func TestStop(t *testing.T) { + h := &errorHandler{ + patterns: []*pattern{{ + period: 2 * time.Second, + pattern: regexp.MustCompile("^Error.*!$"), + }}, + cLogger: testConsLogger, + } + loggedData = loggedData[:0] + + h.inc(0) + h.inc(0) + h.inc(0) + expCnt := uint32(3) + actCnt := atomic.LoadUint32(&h.patterns[0].count) + if actCnt != expCnt { + t.Fatalf("Unexpected initial 'count', expected %d, actual %d", expCnt, actCnt) + } + + h.stop() + + expCnt = uint32(0) + actCnt = atomic.LoadUint32(&h.patterns[0].count) + if actCnt != expCnt { + t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt) + } + + expStop := uint32(1) + actStop := h.stopFlag + if actStop != expStop { + t.Errorf("Unexpected 'stop', expected %d, actual %d", expStop, actStop) + } + + t1 := h.patterns[0].timer() + if t1 == nil { + t.Error("Unexpected 'timer', expected not nil") + } else if t1.Stop() { + t.Error("Timer was unexpectedly not stopped before") + } + + expLen := 1 + if len(loggedData) != expLen { + t.Errorf("Unexpected number of logged messages, expected %d, actual %d", + expLen, len(loggedData)) + } + expCnt = uint32(3) + actCnt = loggedData[0].cnt + if actCnt != expCnt { + t.Errorf("Unexpected 'count' in logged message, expected %d, actual %d", + expCnt, actCnt) + } +} + +func TestServeDNSConcurrent(t *testing.T) { + eg := newErrorGenerator() + rep := &errorReport{} + + h := &errorHandler{ + patterns: []*pattern{ + { + period: 3 * time.Nanosecond, + pattern: regexp.MustCompile("Failed"), + }, + { + period: 2 * time.Nanosecond, + pattern: regexp.MustCompile("down$"), + }, + }, + cLogger: func(cnt uint32, pattern string, p time.Duration) { + rep.incLoggerCnt() + if pattern == "Failed" { + rep.incConsolidated(Err0, cnt) + } else if pattern == "down$" { + rep.incConsolidated(Err1, cnt) + } + }, + eLogger: func(code int, n, t, e string) { + switch e { + case ErrStr0: + rep.incPassed(Err0) + case ErrStr1: + rep.incPassed(Err1) + case ErrStr2: + rep.incPassed(Err2) + case ErrStr3: + rep.incPassed(Err3) + } + }, + Next: eg, + } + + ctx := context.TODO() + r := new(dns.Msg) + w := &test.ResponseWriter{} + + var wg sync.WaitGroup + runnersCnt := 9 + runner := func() { + defer wg.Done() + for !eg.done() { + h.ServeDNS(ctx, w, r) + } + } + wg.Add(runnersCnt) + for ; runnersCnt > 0; runnersCnt-- { + go runner() + } + + stopCalled := false + for !eg.done() { + if !stopCalled && + atomic.LoadUint32(&rep.s[Err0]) > ErrCnt0/2 && + atomic.LoadUint32(&rep.s[Err1]) > ErrCnt1/2 { + h.stop() + stopCalled = true + } + h.ServeDNS(ctx, w, r) + } + if !stopCalled { + h.stop() + } + + wg.Wait() + time.Sleep(time.Millisecond) + + if rep.loggerCnt() < 3 { + t.Errorf("Perhaps logger was never called by timer") + } + if rep.consolidated(Err0) == 0 { + t.Errorf("Err0 was never reported by consLogger") + } + if rep.consolidated(Err1) == 0 { + t.Errorf("Err1 was never reported by consLogger") + } + if rep.consolidated(Err0)+rep.passed(Err0) != ErrCnt0 { + t.Errorf("Unexpected Err0 count, expected %d, reported by consLogger %d and by ServeDNS %d", + ErrCnt0, rep.consolidated(Err0), rep.passed(Err0)) + } + if rep.consolidated(Err1)+rep.passed(Err1) != ErrCnt1 { + t.Errorf("Unexpected Err1 count, expected %d, reported by consLogger %d and by ServeDNS %d", + ErrCnt1, rep.consolidated(Err1), rep.passed(Err1)) + } + if rep.passed(Err2) != ErrCnt2 { + t.Errorf("Unexpected Err2 count, expected %d, reported by ServeDNS %d", + ErrCnt2, rep.passed(Err2)) + } + if rep.passed(Err3) != ErrCnt3 { + t.Errorf("Unexpected Err3 count, expected %d, reported by ServeDNS %d", + ErrCnt3, rep.passed(Err3)) + } +} + +type logData struct { + cnt uint32 + pat string + dur time.Duration +} + +var loggedData []logData + +func testConsLogger(cnt uint32, pattern string, p time.Duration) { + loggedData = append(loggedData, logData{cnt, pattern, p}) +} + +// error generator +const ( + Err0 = iota + Err1 + Err2 + Err3 + + ErrStr0 = "Failed to connect" + ErrStr1 = "Upstream is down" + ErrStr2 = "Access denied" + ErrStr3 = "Yet another error" + + ErrCnt0 = 120000 + ErrCnt1 = 130000 + ErrCnt2 = 150000 + ErrCnt3 = 100000 +) + +type errorBunch struct { + cnt int32 + err error +} + +type errorGenerator struct { + errors [4]errorBunch + doneFlag uint32 +} + +func newErrorGenerator() *errorGenerator { + rand.Seed(time.Now().UnixNano()) + + return &errorGenerator{ + errors: [4]errorBunch{ + {ErrCnt0, fmt.Errorf(ErrStr0)}, + {ErrCnt1, fmt.Errorf(ErrStr1)}, + {ErrCnt2, fmt.Errorf(ErrStr2)}, + {ErrCnt3, fmt.Errorf(ErrStr3)}, + }, + } +} + +func (sh *errorGenerator) done() bool { + return atomic.LoadUint32(&sh.doneFlag) > 0 +} + +func (sh *errorGenerator) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + i := rand.Int() + for c := 0; c < 4; c++ { + errInd := (i + c) & 3 + if atomic.AddInt32(&sh.errors[errInd].cnt, -1) >= 0 { + return 0, sh.errors[errInd].err + } + atomic.AddInt32(&sh.errors[errInd].cnt, 1) + } + atomic.StoreUint32(&sh.doneFlag, 1) + return 0, nil +} + +func (sh *errorGenerator) Name() string { return "errorGenerator" } + +// error report +type errorReport struct { + s [4]uint32 + p [4]uint32 + l uint32 +} + +func (er *errorReport) consolidated(i int) uint32 { + return atomic.LoadUint32(&er.s[i]) +} + +func (er *errorReport) incConsolidated(i int, cnt uint32) { + atomic.AddUint32(&er.s[i], cnt) +} + +func (er *errorReport) passed(i int) uint32 { + return atomic.LoadUint32(&er.p[i]) +} + +func (er *errorReport) incPassed(i int) { + atomic.AddUint32(&er.p[i], 1) +} + +func (er *errorReport) loggerCnt() uint32 { + return atomic.LoadUint32(&er.l) +} + +func (er *errorReport) incLoggerCnt() { + atomic.AddUint32(&er.l, 1) +} + func genErrorHandler(rcode int, err error) plugin.Handler { return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { return rcode, err diff --git a/plugin/errors/setup.go b/plugin/errors/setup.go index d90928f74..42a5a40f5 100644 --- a/plugin/errors/setup.go +++ b/plugin/errors/setup.go @@ -1,7 +1,8 @@ package errors import ( - "fmt" + "regexp" + "time" "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" @@ -22,6 +23,11 @@ func setup(c *caddy.Controller) error { return plugin.Error("errors", err) } + c.OnShutdown(func() error { + handler.stop() + return nil + }) + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { handler.Next = next return handler @@ -30,13 +36,13 @@ func setup(c *caddy.Controller) error { return nil } -func errorsParse(c *caddy.Controller) (errorHandler, error) { - handler := errorHandler{} +func errorsParse(c *caddy.Controller) (*errorHandler, error) { + handler := newErrorHandler() i := 0 for c.Next() { if i > 0 { - return handler, plugin.ErrOnce + return nil, plugin.ErrOnce } i++ @@ -45,11 +51,39 @@ func errorsParse(c *caddy.Controller) (errorHandler, error) { case 0: case 1: if args[0] != "stdout" { - return handler, fmt.Errorf("invalid log file: %s", args[0]) + return nil, c.Errf("invalid log file: %s", args[0]) } default: - return handler, c.ArgErr() + return nil, c.ArgErr() + } + + for c.NextBlock() { + if err := parseBlock(c, handler); err != nil { + return nil, err + } } } return handler, nil } + +func parseBlock(c *caddy.Controller, h *errorHandler) error { + if c.Val() != "consolidate" { + return c.SyntaxErr("consolidate") + } + + args := c.RemainingArgs() + if len(args) != 2 { + return c.ArgErr() + } + p, err := time.ParseDuration(args[0]) + if err != nil { + return c.Err(err.Error()) + } + re, err := regexp.Compile(args[1]) + if err != nil { + return c.Err(err.Error()) + } + h.patterns = append(h.patterns, &pattern{period: p, pattern: re}) + + return nil +} diff --git a/plugin/errors/setup_test.go b/plugin/errors/setup_test.go index 3cfd7c03c..3305f6823 100644 --- a/plugin/errors/setup_test.go +++ b/plugin/errors/setup_test.go @@ -10,24 +10,60 @@ func TestErrorsParse(t *testing.T) { tests := []struct { inputErrorsRules string shouldErr bool + optCount int }{ - {`errors`, false}, - {`errors stdout`, false}, - {`errors errors.txt`, true}, - {`errors visible`, true}, - {`errors { log visible }`, true}, + {`errors`, false, 0}, + {`errors stdout`, false, 0}, + {`errors errors.txt`, true, 0}, + {`errors visible`, true, 0}, + {`errors { log visible }`, true, 0}, {`errors - errors `, true}, - {`errors a b`, true}, + errors `, true, 0}, + {`errors a b`, true, 0}, + + {`errors { + consolidate + }`, true, 0}, + {`errors { + consolidate 1m + }`, true, 0}, + {`errors { + consolidate 1m .* extra + }`, true, 0}, + {`errors { + consolidate abc .* + }`, true, 0}, + {`errors { + consolidate 1 .* + }`, true, 0}, + {`errors { + consolidate 1m ()) + }`, true, 0}, + {`errors { + consolidate 1m ^exact$ + }`, false, 1}, + {`errors { + consolidate 1m error + }`, false, 1}, + {`errors { + consolidate 1m "format error" + }`, false, 1}, + {`errors { + consolidate 1m error1 + consolidate 5s error2 + }`, false, 2}, } for i, test := range tests { c := caddy.NewTestController("dns", test.inputErrorsRules) - _, err := errorsParse(c) + h, err := errorsParse(c) if err == nil && test.shouldErr { t.Errorf("Test %d didn't error, but it should have", i) } else if err != nil && !test.shouldErr { t.Errorf("Test %d errored, but it shouldn't have; got '%v'", i, err) + } else if h != nil && len(h.patterns) != test.optCount { + t.Errorf("Test %d: pattern count mismatch, expected %d, got %d", + i, test.optCount, len(h.patterns)) } } }