From daf8ef0da8a9aafc610f7a3dbcf021ec4365ee72 Mon Sep 17 00:00:00 2001 From: varyoo Date: Tue, 26 Sep 2017 17:45:33 +0200 Subject: [PATCH] Adds the dnstap I/O routines and should fix some issues (#1083) * adds the dnstap I/O thread and should fix a lot of mistakes * docs * -race test * oops * docs --- plugin/dnstap/dnstapio/io.go | 69 ++++++++++++++++++++++++++++++ plugin/dnstap/dnstapio/io_test.go | 71 +++++++++++++++++++++++++++++++ plugin/dnstap/handler.go | 9 +++- plugin/dnstap/handler_test.go | 19 +++------ plugin/dnstap/msg/wrapper.go | 8 ++-- plugin/dnstap/out/tcp.go | 2 +- plugin/dnstap/setup.go | 8 ++-- 7 files changed, 164 insertions(+), 22 deletions(-) create mode 100644 plugin/dnstap/dnstapio/io.go create mode 100644 plugin/dnstap/dnstapio/io_test.go diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go new file mode 100644 index 000000000..586def2ac --- /dev/null +++ b/plugin/dnstap/dnstapio/io.go @@ -0,0 +1,69 @@ +package dnstapio + +import ( + "fmt" + "io" + + tap "github.com/dnstap/golang-dnstap" + "github.com/golang/protobuf/proto" +) + +// DnstapIO wraps the dnstap I/O routine. +type DnstapIO struct { + writer io.WriteCloser + queue chan tap.Dnstap + stop chan bool +} + +// Protocol is either `out.TCP` or `out.Socket`. +type Protocol interface { + // Write takes a single frame at once. + Write([]byte) (int, error) + + Close() error +} + +// New dnstap I/O routine from Protocol. +func New(w Protocol) *DnstapIO { + dio := DnstapIO{} + dio.writer = w + dio.queue = make(chan tap.Dnstap, 10) + dio.stop = make(chan bool) + go dio.serve() + return &dio +} + +// Dnstap enqueues the payload for log. +func (dio *DnstapIO) Dnstap(payload tap.Dnstap) { + select { + case dio.queue <- payload: + default: + fmt.Println("[WARN] Dnstap payload dropped.") + } +} + +func (dio *DnstapIO) serve() { + for { + select { + case payload := <-dio.queue: + frame, err := proto.Marshal(&payload) + if err == nil { + dio.writer.Write(frame) + } else { + fmt.Println("[ERROR] Invalid dnstap payload dropped.") + } + case <-dio.stop: + close(dio.queue) + dio.stop <- true + return + } + } +} + +// Close waits until the I/O routine is finished to return. +func (dio DnstapIO) Close() error { + dio.stop <- true + <-dio.stop + close(dio.stop) + return dio.writer.Close() +} diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go new file mode 100644 index 000000000..80b804752 --- /dev/null +++ b/plugin/dnstap/dnstapio/io_test.go @@ -0,0 +1,71 @@ +package dnstapio + +import ( + "bytes" + "sync" + "testing" + "time" + + tap "github.com/dnstap/golang-dnstap" +) + +type buf struct { + *bytes.Buffer + cost time.Duration +} + +func (b buf) Write(frame []byte) (int, error) { + time.Sleep(b.cost) + return b.Buffer.Write(frame) +} + +func (b buf) Close() error { + return nil +} + +func TestRace(t *testing.T) { + b := buf{&bytes.Buffer{}, 100 * time.Millisecond} + dio := New(b) + wg := &sync.WaitGroup{} + wg.Add(10) + for i := 0; i < 10; i++ { + timeout := time.After(time.Second) + go func() { + for { + select { + case <-timeout: + wg.Done() + return + default: + time.Sleep(50 * time.Millisecond) + dio.Dnstap(tap.Dnstap{}) + } + } + }() + } + wg.Wait() +} + +func TestClose(t *testing.T) { + done := make(chan bool) + var dio *DnstapIO + go func() { + b := buf{&bytes.Buffer{}, 0} + dio = New(b) + dio.Close() + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Not closing.") + } + func() { + defer func() { + if err := recover(); err == nil { + t.Fatal("Send on closed channel.") + } + }() + dio.Dnstap(tap.Dnstap{}) + }() +} diff --git a/plugin/dnstap/handler.go b/plugin/dnstap/handler.go index b20bb2ad9..b6a8afbe7 100644 --- a/plugin/dnstap/handler.go +++ b/plugin/dnstap/handler.go @@ -16,11 +16,15 @@ import ( // Dnstap is the dnstap handler. type Dnstap struct { Next plugin.Handler - Out io.Writer + IO IORoutine Pack bool } type ( + // IORoutine is the dnstap I/O thread as defined by: . + IORoutine interface { + Dnstap(tap.Dnstap) + } // Tapper is implemented by the Context passed by the dnstap handler. Tapper interface { TapMessage(*tap.Message) error @@ -49,7 +53,8 @@ func tapMessageTo(w io.Writer, m *tap.Message) error { // TapMessage implements Tapper. func (h Dnstap) TapMessage(m *tap.Message) error { - return tapMessageTo(h.Out, m) + h.IO.Dnstap(msg.Wrap(m)) + return nil } // TapBuilder implements Tapper. diff --git a/plugin/dnstap/handler_test.go b/plugin/dnstap/handler_test.go index 54509de82..617c8e675 100644 --- a/plugin/dnstap/handler_test.go +++ b/plugin/dnstap/handler_test.go @@ -1,21 +1,18 @@ package dnstap import ( - "errors" - "fmt" "testing" "github.com/coredns/coredns/plugin/dnstap/test" mwtest "github.com/coredns/coredns/plugin/test" tap "github.com/dnstap/golang-dnstap" - "github.com/golang/protobuf/proto" "github.com/miekg/dns" "golang.org/x/net/context" ) func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) { - w := writer{} + w := writer{t: t} w.queue = append(w.queue, tapq, tapr) h := Dnstap{ Next: mwtest.HandlerFunc(func(_ context.Context, @@ -23,7 +20,7 @@ func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) { return 0, w.WriteMsg(r) }), - Out: &w, + IO: &w, Pack: false, } _, err := h.ServeDNS(context.TODO(), &mwtest.ResponseWriter{}, q) @@ -33,22 +30,18 @@ func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) { } type writer struct { + t *testing.T queue []*tap.Message } -func (w *writer) Write(b []byte) (int, error) { - e := tap.Dnstap{} - if err := proto.Unmarshal(b, &e); err != nil { - return 0, err - } +func (w *writer) Dnstap(e tap.Dnstap) { if len(w.queue) == 0 { - return 0, errors.New("message not expected") + w.t.Error("Message not expected.") } if !test.MsgEqual(w.queue[0], e.Message) { - return 0, fmt.Errorf("want: %v, have: %v", w.queue[0], e.Message) + w.t.Errorf("want: %v, have: %v", w.queue[0], e.Message) } w.queue = w.queue[1:] - return len(b), nil } func TestDnstap(t *testing.T) { diff --git a/plugin/dnstap/msg/wrapper.go b/plugin/dnstap/msg/wrapper.go index a74c604d8..3396b1342 100644 --- a/plugin/dnstap/msg/wrapper.go +++ b/plugin/dnstap/msg/wrapper.go @@ -7,9 +7,10 @@ import ( "github.com/golang/protobuf/proto" ) -func wrap(m *lib.Message) *lib.Dnstap { +// Wrap a dnstap message in the top-level dnstap type. +func Wrap(m *lib.Message) lib.Dnstap { t := lib.Dnstap_MESSAGE - return &lib.Dnstap{ + return lib.Dnstap{ Type: &t, Message: m, } @@ -17,7 +18,8 @@ func wrap(m *lib.Message) *lib.Dnstap { // Marshal encodes the message to a binary dnstap payload. func Marshal(m *lib.Message) (data []byte, err error) { - data, err = proto.Marshal(wrap(m)) + payload := Wrap(m) + data, err = proto.Marshal(&payload) if err != nil { err = fmt.Errorf("proto: %s", err) return diff --git a/plugin/dnstap/out/tcp.go b/plugin/dnstap/out/tcp.go index 8d2c25270..715c3024a 100644 --- a/plugin/dnstap/out/tcp.go +++ b/plugin/dnstap/out/tcp.go @@ -32,7 +32,7 @@ func (s *TCP) Write(frame []byte) (n int, err error) { // Flush the remaining frames. func (s *TCP) Flush() error { defer func() { - s.frames = s.frames[0:] + s.frames = s.frames[:0] }() c, err := net.DialTimeout("tcp", s.address, time.Second) if err != nil { diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go index a57873470..c1a8956a1 100644 --- a/plugin/dnstap/setup.go +++ b/plugin/dnstap/setup.go @@ -8,6 +8,7 @@ import ( "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/dnstap/dnstapio" "github.com/coredns/coredns/plugin/dnstap/out" "github.com/coredns/coredns/plugin/pkg/dnsutil" @@ -79,11 +80,12 @@ func setup(c *caddy.Controller) error { } else { o = out.NewTCP(conf.target) } - dnstap.Out = o + dio := dnstapio.New(o) + dnstap.IO = dio c.OnShutdown(func() error { - if err := o.Close(); err != nil { - return fmt.Errorf("output: %s", err) + if err := dio.Close(); err != nil { + return fmt.Errorf("dnstap io routine: %s", err) } return nil })