plugin/errors: 'consolidate' option (#2192)

- see more details at https://github.com/infobloxopen/coredns-plugin-errors/pull/3
This commit is contained in:
Ruslan Drozhdzh 2018-10-27 17:37:09 +03:00 committed by Miek Gieben
parent b0a89452ef
commit 7b25d18019
5 changed files with 582 additions and 24 deletions

View file

@ -6,16 +6,36 @@
## Description
Any errors encountered during the query processing will be printed to standard output.
Any errors encountered during the query processing will be printed to standard output. The errors of particular type can be consolidated and printed once per some period of time.
This plugin can only be used once per Server Block.
## Syntax
The basic syntax is:
~~~
errors
~~~
Extra knobs are available with an expanded syntax:
~~~
errors {
consolidate DURATION REGEXP
}
~~~
Option `consolidate` allows collecting several error messages matching the regular expression **REGEXP** during **DURATION**. After the **DURATION** since receiving the first such message, the consolidated message will be printed to standard output, e.g.
~~~
2 errors like '^read udp .* i/o timeout$' occurred in last 30s
~~~
Multiple `consolidate` options with different **DURATION** and **REGEXP** are allowed. In case if some error message corresponds to several defined regular expressions the message will be associated with the first appropriate **REGEXP**.
For better performance, it's recomended to use the `^` or `$` metacharacters in regular expression when filtering error messages by prefix or suffix, e.g. `^failed to .*`, or `.* timeout$`.
## Examples
Use the *whoami* to respond to queries and Log errors to standard output.
@ -26,3 +46,15 @@ Use the *whoami* to respond to queries and Log errors to standard output.
errors
}
~~~
Use the *forward* to resolve queries via 8.8.8.8 and print consolidated error messages for errors with suffix " i/o timeout" or with prefix "Failed to ".
~~~ corefile
. {
forward . 8.8.8.8
errors {
consolidate 5m ".* i/o timeout$"
consolidate 30s "^Failed to .+"
}
}
~~~

View file

@ -1,8 +1,12 @@
// Package errors implements an HTTP error handling plugin.
// Package errors implements an error handling plugin.
package errors
import (
"context"
"regexp"
"sync/atomic"
"time"
"unsafe"
"github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log"
@ -11,19 +15,98 @@ import (
"github.com/miekg/dns"
)
var log = clog.NewWithPlugin("errors")
type pattern struct {
ptimer unsafe.Pointer
count uint32
period time.Duration
pattern *regexp.Regexp
}
func (p *pattern) timer() *time.Timer {
return (*time.Timer)(atomic.LoadPointer(&p.ptimer))
}
func (p *pattern) setTimer(t *time.Timer) {
atomic.StorePointer(&p.ptimer, unsafe.Pointer(t))
}
// errorHandler handles DNS errors (and errors from other plugin).
type errorHandler struct{ Next plugin.Handler }
type errorHandler struct {
patterns []*pattern
eLogger func(int, string, string, string)
cLogger func(uint32, string, time.Duration)
stopFlag uint32
Next plugin.Handler
}
func newErrorHandler() *errorHandler {
return &errorHandler{eLogger: errorLogger, cLogger: consLogger}
}
func errorLogger(code int, qName, qType, err string) {
log.Errorf("%d %s %s: %s", code, qName, qType, err)
}
func consLogger(cnt uint32, pattern string, p time.Duration) {
log.Errorf("%d errors like '%s' occured in last %s", cnt, pattern, p)
}
func (h *errorHandler) logPattern(i int) {
cnt := atomic.SwapUint32(&h.patterns[i].count, 0)
if cnt > 0 {
h.cLogger(cnt, h.patterns[i].pattern.String(), h.patterns[i].period)
}
}
func (h *errorHandler) inc(i int) bool {
if atomic.LoadUint32(&h.stopFlag) > 0 {
return false
}
if atomic.AddUint32(&h.patterns[i].count, 1) == 1 {
ind := i
t := time.AfterFunc(h.patterns[ind].period, func() {
h.logPattern(ind)
})
h.patterns[ind].setTimer(t)
if atomic.LoadUint32(&h.stopFlag) > 0 && t.Stop() {
h.logPattern(ind)
}
}
return true
}
func (h *errorHandler) stop() {
atomic.StoreUint32(&h.stopFlag, 1)
for i := range h.patterns {
t := h.patterns[i].timer()
if t != nil && t.Stop() {
h.logPattern(i)
}
}
}
// ServeDNS implements the plugin.Handler interface.
func (h errorHandler) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
func (h *errorHandler) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
rcode, err := plugin.NextOrFailure(h.Name(), h.Next, ctx, w, r)
if err != nil {
strErr := err.Error()
for i := range h.patterns {
if h.patterns[i].pattern.MatchString(strErr) {
if h.inc(i) {
return rcode, err
}
break
}
}
state := request.Request{W: w, Req: r}
clog.Errorf("%d %s %s: %v", rcode, state.Name(), state.Type(), err)
h.eLogger(rcode, state.Name(), state.Type(), strErr)
}
return rcode, err
}
func (h errorHandler) Name() string { return "errors" }
// Name implements the plugin.Handler interface.
func (h *errorHandler) Name() string { return "errors" }

View file

@ -5,9 +5,14 @@ import (
"context"
"errors"
"fmt"
"log"
golog "log"
"math/rand"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/dnstest"
@ -18,8 +23,8 @@ import (
func TestErrors(t *testing.T) {
buf := bytes.Buffer{}
log.SetOutput(&buf)
em := errorHandler{}
golog.SetOutput(&buf)
em := errorHandler{eLogger: errorLogger}
testErr := errors.New("test error")
tests := []struct {
@ -37,7 +42,7 @@ func TestErrors(t *testing.T) {
{
next: genErrorHandler(dns.RcodeNotAuth, testErr),
expectedCode: dns.RcodeNotAuth,
expectedLog: fmt.Sprintf("[ERROR] %d %s: %v\n", dns.RcodeNotAuth, "example.org. A", testErr),
expectedLog: fmt.Sprintf("%d %s: %v\n", dns.RcodeNotAuth, "example.org. A", testErr),
expectedErr: testErr,
},
}
@ -67,6 +72,374 @@ func TestErrors(t *testing.T) {
}
}
func TestConsLogger(t *testing.T) {
buf := bytes.Buffer{}
golog.SetOutput(&buf)
consLogger(5, "^Error.*!$", 3*time.Second)
exp := "[ERROR] plugin/errors: 5 errors like '^Error.*!$' occured in last 3s"
act := buf.String()
if !strings.Contains(act, exp) {
t.Errorf("Unexpected log message, expected to contain %q, actual %q", exp, act)
}
}
func TestLogPattern(t *testing.T) {
h := &errorHandler{
patterns: []*pattern{{
count: 4,
period: 2 * time.Second,
pattern: regexp.MustCompile("^Error.*!$"),
}},
cLogger: testConsLogger,
}
loggedData = loggedData[:0]
h.logPattern(0)
expLen := 1
if len(loggedData) != expLen {
t.Errorf("Unexpected number of logged messages, expected %d, actual %d",
expLen, len(loggedData))
}
expCnt := uint32(4)
if loggedData[0].cnt != expCnt {
t.Errorf("Unexpected 'count' in logged message, expected %d, actual %d",
expCnt, loggedData[0].cnt)
}
expPat := "^Error.*!$"
actPat := loggedData[0].pat
if actPat != expPat {
t.Errorf("Unexpected 'pattern' in logged message, expected %s, actual %s",
expPat, actPat)
}
expPer := "2s"
actPer := loggedData[0].dur.String()
if actPer != expPer {
t.Errorf("Unexpected 'period' in logged message, expected %s, actual %s",
expPer, actPer)
}
h.logPattern(0)
if len(loggedData) != expLen {
t.Errorf("Unexpected number of logged messages, expected %d, actual %d",
expLen, len(loggedData))
}
}
func TestInc(t *testing.T) {
h := &errorHandler{
stopFlag: 1,
patterns: []*pattern{{
period: 2 * time.Second,
pattern: regexp.MustCompile("^Error.*!$"),
}},
cLogger: testConsLogger,
}
ret := h.inc(0)
if ret {
t.Error("Unexpected return value, expected false, actual true")
}
h.stopFlag = 0
ret = h.inc(0)
if !ret {
t.Error("Unexpected return value, expected true, actual false")
}
expCnt := uint32(1)
actCnt := atomic.LoadUint32(&h.patterns[0].count)
if actCnt != expCnt {
t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt)
}
t1 := h.patterns[0].timer()
if t1 == nil {
t.Error("Unexpected 'timer', expected not nil")
}
ret = h.inc(0)
if !ret {
t.Error("Unexpected return value, expected true, actual false")
}
expCnt = uint32(2)
actCnt = atomic.LoadUint32(&h.patterns[0].count)
if actCnt != expCnt {
t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt)
}
t2 := h.patterns[0].timer()
if t2 != t1 {
t.Error("Unexpected 'timer', expected the same")
}
ret = t1.Stop()
if !ret {
t.Error("Timer was unexpectedly stopped before")
}
ret = t2.Stop()
if ret {
t.Error("Timer was unexpectedly not stopped before")
}
}
func TestStop(t *testing.T) {
h := &errorHandler{
patterns: []*pattern{{
period: 2 * time.Second,
pattern: regexp.MustCompile("^Error.*!$"),
}},
cLogger: testConsLogger,
}
loggedData = loggedData[:0]
h.inc(0)
h.inc(0)
h.inc(0)
expCnt := uint32(3)
actCnt := atomic.LoadUint32(&h.patterns[0].count)
if actCnt != expCnt {
t.Fatalf("Unexpected initial 'count', expected %d, actual %d", expCnt, actCnt)
}
h.stop()
expCnt = uint32(0)
actCnt = atomic.LoadUint32(&h.patterns[0].count)
if actCnt != expCnt {
t.Errorf("Unexpected 'count', expected %d, actual %d", expCnt, actCnt)
}
expStop := uint32(1)
actStop := h.stopFlag
if actStop != expStop {
t.Errorf("Unexpected 'stop', expected %d, actual %d", expStop, actStop)
}
t1 := h.patterns[0].timer()
if t1 == nil {
t.Error("Unexpected 'timer', expected not nil")
} else if t1.Stop() {
t.Error("Timer was unexpectedly not stopped before")
}
expLen := 1
if len(loggedData) != expLen {
t.Errorf("Unexpected number of logged messages, expected %d, actual %d",
expLen, len(loggedData))
}
expCnt = uint32(3)
actCnt = loggedData[0].cnt
if actCnt != expCnt {
t.Errorf("Unexpected 'count' in logged message, expected %d, actual %d",
expCnt, actCnt)
}
}
func TestServeDNSConcurrent(t *testing.T) {
eg := newErrorGenerator()
rep := &errorReport{}
h := &errorHandler{
patterns: []*pattern{
{
period: 3 * time.Nanosecond,
pattern: regexp.MustCompile("Failed"),
},
{
period: 2 * time.Nanosecond,
pattern: regexp.MustCompile("down$"),
},
},
cLogger: func(cnt uint32, pattern string, p time.Duration) {
rep.incLoggerCnt()
if pattern == "Failed" {
rep.incConsolidated(Err0, cnt)
} else if pattern == "down$" {
rep.incConsolidated(Err1, cnt)
}
},
eLogger: func(code int, n, t, e string) {
switch e {
case ErrStr0:
rep.incPassed(Err0)
case ErrStr1:
rep.incPassed(Err1)
case ErrStr2:
rep.incPassed(Err2)
case ErrStr3:
rep.incPassed(Err3)
}
},
Next: eg,
}
ctx := context.TODO()
r := new(dns.Msg)
w := &test.ResponseWriter{}
var wg sync.WaitGroup
runnersCnt := 9
runner := func() {
defer wg.Done()
for !eg.done() {
h.ServeDNS(ctx, w, r)
}
}
wg.Add(runnersCnt)
for ; runnersCnt > 0; runnersCnt-- {
go runner()
}
stopCalled := false
for !eg.done() {
if !stopCalled &&
atomic.LoadUint32(&rep.s[Err0]) > ErrCnt0/2 &&
atomic.LoadUint32(&rep.s[Err1]) > ErrCnt1/2 {
h.stop()
stopCalled = true
}
h.ServeDNS(ctx, w, r)
}
if !stopCalled {
h.stop()
}
wg.Wait()
time.Sleep(time.Millisecond)
if rep.loggerCnt() < 3 {
t.Errorf("Perhaps logger was never called by timer")
}
if rep.consolidated(Err0) == 0 {
t.Errorf("Err0 was never reported by consLogger")
}
if rep.consolidated(Err1) == 0 {
t.Errorf("Err1 was never reported by consLogger")
}
if rep.consolidated(Err0)+rep.passed(Err0) != ErrCnt0 {
t.Errorf("Unexpected Err0 count, expected %d, reported by consLogger %d and by ServeDNS %d",
ErrCnt0, rep.consolidated(Err0), rep.passed(Err0))
}
if rep.consolidated(Err1)+rep.passed(Err1) != ErrCnt1 {
t.Errorf("Unexpected Err1 count, expected %d, reported by consLogger %d and by ServeDNS %d",
ErrCnt1, rep.consolidated(Err1), rep.passed(Err1))
}
if rep.passed(Err2) != ErrCnt2 {
t.Errorf("Unexpected Err2 count, expected %d, reported by ServeDNS %d",
ErrCnt2, rep.passed(Err2))
}
if rep.passed(Err3) != ErrCnt3 {
t.Errorf("Unexpected Err3 count, expected %d, reported by ServeDNS %d",
ErrCnt3, rep.passed(Err3))
}
}
type logData struct {
cnt uint32
pat string
dur time.Duration
}
var loggedData []logData
func testConsLogger(cnt uint32, pattern string, p time.Duration) {
loggedData = append(loggedData, logData{cnt, pattern, p})
}
// error generator
const (
Err0 = iota
Err1
Err2
Err3
ErrStr0 = "Failed to connect"
ErrStr1 = "Upstream is down"
ErrStr2 = "Access denied"
ErrStr3 = "Yet another error"
ErrCnt0 = 120000
ErrCnt1 = 130000
ErrCnt2 = 150000
ErrCnt3 = 100000
)
type errorBunch struct {
cnt int32
err error
}
type errorGenerator struct {
errors [4]errorBunch
doneFlag uint32
}
func newErrorGenerator() *errorGenerator {
rand.Seed(time.Now().UnixNano())
return &errorGenerator{
errors: [4]errorBunch{
{ErrCnt0, fmt.Errorf(ErrStr0)},
{ErrCnt1, fmt.Errorf(ErrStr1)},
{ErrCnt2, fmt.Errorf(ErrStr2)},
{ErrCnt3, fmt.Errorf(ErrStr3)},
},
}
}
func (sh *errorGenerator) done() bool {
return atomic.LoadUint32(&sh.doneFlag) > 0
}
func (sh *errorGenerator) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
i := rand.Int()
for c := 0; c < 4; c++ {
errInd := (i + c) & 3
if atomic.AddInt32(&sh.errors[errInd].cnt, -1) >= 0 {
return 0, sh.errors[errInd].err
}
atomic.AddInt32(&sh.errors[errInd].cnt, 1)
}
atomic.StoreUint32(&sh.doneFlag, 1)
return 0, nil
}
func (sh *errorGenerator) Name() string { return "errorGenerator" }
// error report
type errorReport struct {
s [4]uint32
p [4]uint32
l uint32
}
func (er *errorReport) consolidated(i int) uint32 {
return atomic.LoadUint32(&er.s[i])
}
func (er *errorReport) incConsolidated(i int, cnt uint32) {
atomic.AddUint32(&er.s[i], cnt)
}
func (er *errorReport) passed(i int) uint32 {
return atomic.LoadUint32(&er.p[i])
}
func (er *errorReport) incPassed(i int) {
atomic.AddUint32(&er.p[i], 1)
}
func (er *errorReport) loggerCnt() uint32 {
return atomic.LoadUint32(&er.l)
}
func (er *errorReport) incLoggerCnt() {
atomic.AddUint32(&er.l, 1)
}
func genErrorHandler(rcode int, err error) plugin.Handler {
return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
return rcode, err

View file

@ -1,7 +1,8 @@
package errors
import (
"fmt"
"regexp"
"time"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
@ -22,6 +23,11 @@ func setup(c *caddy.Controller) error {
return plugin.Error("errors", err)
}
c.OnShutdown(func() error {
handler.stop()
return nil
})
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
handler.Next = next
return handler
@ -30,13 +36,13 @@ func setup(c *caddy.Controller) error {
return nil
}
func errorsParse(c *caddy.Controller) (errorHandler, error) {
handler := errorHandler{}
func errorsParse(c *caddy.Controller) (*errorHandler, error) {
handler := newErrorHandler()
i := 0
for c.Next() {
if i > 0 {
return handler, plugin.ErrOnce
return nil, plugin.ErrOnce
}
i++
@ -45,11 +51,39 @@ func errorsParse(c *caddy.Controller) (errorHandler, error) {
case 0:
case 1:
if args[0] != "stdout" {
return handler, fmt.Errorf("invalid log file: %s", args[0])
return nil, c.Errf("invalid log file: %s", args[0])
}
default:
return handler, c.ArgErr()
return nil, c.ArgErr()
}
for c.NextBlock() {
if err := parseBlock(c, handler); err != nil {
return nil, err
}
}
}
return handler, nil
}
func parseBlock(c *caddy.Controller, h *errorHandler) error {
if c.Val() != "consolidate" {
return c.SyntaxErr("consolidate")
}
args := c.RemainingArgs()
if len(args) != 2 {
return c.ArgErr()
}
p, err := time.ParseDuration(args[0])
if err != nil {
return c.Err(err.Error())
}
re, err := regexp.Compile(args[1])
if err != nil {
return c.Err(err.Error())
}
h.patterns = append(h.patterns, &pattern{period: p, pattern: re})
return nil
}

View file

@ -10,24 +10,60 @@ func TestErrorsParse(t *testing.T) {
tests := []struct {
inputErrorsRules string
shouldErr bool
optCount int
}{
{`errors`, false},
{`errors stdout`, false},
{`errors errors.txt`, true},
{`errors visible`, true},
{`errors { log visible }`, true},
{`errors`, false, 0},
{`errors stdout`, false, 0},
{`errors errors.txt`, true, 0},
{`errors visible`, true, 0},
{`errors { log visible }`, true, 0},
{`errors
errors `, true},
{`errors a b`, true},
errors `, true, 0},
{`errors a b`, true, 0},
{`errors {
consolidate
}`, true, 0},
{`errors {
consolidate 1m
}`, true, 0},
{`errors {
consolidate 1m .* extra
}`, true, 0},
{`errors {
consolidate abc .*
}`, true, 0},
{`errors {
consolidate 1 .*
}`, true, 0},
{`errors {
consolidate 1m ())
}`, true, 0},
{`errors {
consolidate 1m ^exact$
}`, false, 1},
{`errors {
consolidate 1m error
}`, false, 1},
{`errors {
consolidate 1m "format error"
}`, false, 1},
{`errors {
consolidate 1m error1
consolidate 5s error2
}`, false, 2},
}
for i, test := range tests {
c := caddy.NewTestController("dns", test.inputErrorsRules)
_, err := errorsParse(c)
h, err := errorsParse(c)
if err == nil && test.shouldErr {
t.Errorf("Test %d didn't error, but it should have", i)
} else if err != nil && !test.shouldErr {
t.Errorf("Test %d errored, but it shouldn't have; got '%v'", i, err)
} else if h != nil && len(h.patterns) != test.optCount {
t.Errorf("Test %d: pattern count mismatch, expected %d, got %d",
i, test.optCount, len(h.patterns))
}
}
}