diff --git a/plugin/dnstap/README.md b/plugin/dnstap/README.md index b2bbd3c0f..8dc9b5674 100644 --- a/plugin/dnstap/README.md +++ b/plugin/dnstap/README.md @@ -9,8 +9,8 @@ dnstap is a flexible, structured binary log format for DNS software; see https://dnstap.info. With this plugin you make CoreDNS output dnstap logging. -Note that there is an internal buffer, so expect at least 13 requests before the server sends its -dnstap messages to the socket. +Every message is sent to the socket as soon as it comes in, the *dnstap* plugin has a buffer of +10000 messages, above that number dnstap messages will be dropped (this is logged). ## Syntax @@ -100,5 +100,5 @@ func (x RandomPlugin) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns ## See Also -The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol. -The *forward* plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream. +The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol. The *forward* +plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream. diff --git a/plugin/dnstap/dnstapio/dnstap_encoder.go b/plugin/dnstap/dnstapio/dnstap_encoder.go deleted file mode 100644 index 65b15f587..000000000 --- a/plugin/dnstap/dnstapio/dnstap_encoder.go +++ /dev/null @@ -1,91 +0,0 @@ -package dnstapio - -import ( - "encoding/binary" - "fmt" - "io" - - tap "github.com/dnstap/golang-dnstap" - fs "github.com/farsightsec/golang-framestream" - "github.com/golang/protobuf/proto" -) - -const ( - protobufSize = 1024 * 1024 -) - -type dnstapEncoder struct { - fse *fs.Encoder - opts *fs.EncoderOptions - writer io.Writer - buffer *proto.Buffer -} - -func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder { - return &dnstapEncoder{ - opts: o, - buffer: proto.NewBuffer(make([]byte, 0, protobufSize)), - } -} - -func (enc *dnstapEncoder) resetWriter(w io.Writer) error { - fse, err := fs.NewEncoder(w, enc.opts) - if err != nil { - return err - } - if err = fse.Flush(); err != nil { - return err - } - enc.fse = fse - enc.writer = w - return nil -} - -func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error { - if len(enc.buffer.Bytes()) >= protobufSize { - if err := enc.flushBuffer(); err != nil { - return err - } - } - bufLen := len(enc.buffer.Bytes()) - // add placeholder for frame length - if err := enc.buffer.EncodeFixed32(0); err != nil { - enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) - return err - } - if err := enc.buffer.Marshal(msg); err != nil { - enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) - return err - } - enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:]) - return nil -} - -func (enc *dnstapEncoder) flushBuffer() error { - if enc.fse == nil || enc.writer == nil { - return fmt.Errorf("no writer") - } - - buf := enc.buffer.Bytes() - written := 0 - for written < len(buf) { - n, err := enc.writer.Write(buf[written:]) - written += n - if err != nil { - return err - } - } - enc.buffer.Reset() - return nil -} - -func (enc *dnstapEncoder) encodeFrameLen(buf []byte) { - binary.BigEndian.PutUint32(buf, uint32(len(buf)-4)) -} - -func (enc *dnstapEncoder) close() error { - if enc.fse != nil { - return enc.fse.Close() - } - return nil -} diff --git a/plugin/dnstap/dnstapio/dnstap_encoder_test.go b/plugin/dnstap/dnstapio/dnstap_encoder_test.go deleted file mode 100644 index a7fe23d2b..000000000 --- a/plugin/dnstap/dnstapio/dnstap_encoder_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package dnstapio - -import ( - "bytes" - "testing" - - tap "github.com/dnstap/golang-dnstap" - fs "github.com/farsightsec/golang-framestream" - "github.com/golang/protobuf/proto" -) - -func dnstapMsg() *tap.Dnstap { - t := tap.Dnstap_MESSAGE - mt := tap.Message_CLIENT_RESPONSE - msg := &tap.Message{Type: &mt} - return &tap.Dnstap{Type: &t, Message: msg} -} - -func TestEncoderCompatibility(t *testing.T) { - opts := &fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.DnstapTest"), - Bidirectional: false, - } - msg := dnstapMsg() - - //framestream encoder - fsW := new(bytes.Buffer) - fsEnc, err := fs.NewEncoder(fsW, opts) - if err != nil { - t.Fatal(err) - } - data, err := proto.Marshal(msg) - if err != nil { - t.Fatal(err) - } - fsEnc.Write(data) - fsEnc.Close() - - //dnstap encoder - dnstapW := new(bytes.Buffer) - dnstapEnc := newDnstapEncoder(opts) - if err := dnstapEnc.resetWriter(dnstapW); err != nil { - t.Fatal(err) - } - dnstapEnc.writeMsg(msg) - dnstapEnc.flushBuffer() - dnstapEnc.close() - - //compare results - if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) { - t.Fatal("DnstapEncoder is not compatible with framestream Encoder") - } -} diff --git a/plugin/dnstap/dnstapio/encoder.go b/plugin/dnstap/dnstapio/encoder.go new file mode 100644 index 000000000..2b4a76cd5 --- /dev/null +++ b/plugin/dnstap/dnstapio/encoder.go @@ -0,0 +1,41 @@ +// Package dnstapio is a small wrapper around golang-framestream +package dnstapio + +import ( + "io" + "time" + + tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +// Encoder wraps a fs.Encoder. +type Encoder struct { + fs *fs.Encoder +} + +func newEncoder(w io.Writer, timeout time.Duration) (*Encoder, error) { + fs, err := fs.NewEncoder(w, &fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + Timeout: timeout, + }) + if err != nil { + return nil, err + } + return &Encoder{fs}, nil +} + +func (e *Encoder) writeMsg(msg *tap.Dnstap) error { + buf, err := proto.Marshal(msg) + if err != nil { + return err + } + + _, err = e.fs.Write(buf) // n < len(buf) should return an error + return err +} + +func (e *Encoder) flush() error { return e.fs.Flush() } +func (e *Encoder) close() error { return e.fs.Close() } diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index c88fc14ab..d85196cc8 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -8,16 +8,15 @@ import ( clog "github.com/coredns/coredns/plugin/pkg/log" tap "github.com/dnstap/golang-dnstap" - fs "github.com/farsightsec/golang-framestream" ) var log = clog.NewWithPlugin("dnstap") const ( - tcpWriteBufSize = 1024 * 1024 + tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number (see #xxx) + queueSize = 10000 // see #xxxx tcpTimeout = 4 * time.Second flushTimeout = 1 * time.Second - queueSize = 10000 ) // Tapper interface is used in testing to mock the Dnstap method. @@ -27,52 +26,47 @@ type Tapper interface { // dio implements the Tapper interface. type dio struct { - endpoint string - socket bool - conn net.Conn - enc *dnstapEncoder - queue chan tap.Dnstap - dropped uint32 - quit chan struct{} + endpoint string + proto string + conn net.Conn + enc *Encoder + queue chan tap.Dnstap + dropped uint32 + quit chan struct{} + flushTimeout time.Duration + tcpTimeout time.Duration } // New returns a new and initialized pointer to a dio. -func New(endpoint string, socket bool) *dio { +func New(proto, endpoint string) *dio { return &dio{ - endpoint: endpoint, - socket: socket, - enc: newDnstapEncoder(&fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }), - queue: make(chan tap.Dnstap, queueSize), - quit: make(chan struct{}), + endpoint: endpoint, + proto: proto, + queue: make(chan tap.Dnstap, queueSize), + quit: make(chan struct{}), + flushTimeout: flushTimeout, + tcpTimeout: tcpTimeout, } } -func (d *dio) newConnect() error { - var err error - if d.socket { - if d.conn, err = net.Dial("unix", d.endpoint); err != nil { - return err - } - } else { - if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil { - return err - } - if tcpConn, ok := d.conn.(*net.TCPConn); ok { - tcpConn.SetWriteBuffer(tcpWriteBufSize) - tcpConn.SetNoDelay(false) - } +func (d *dio) dial() error { + conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout) + if err != nil { + return err + } + if tcpConn, ok := conn.(*net.TCPConn); ok { + tcpConn.SetWriteBuffer(tcpWriteBufSize) + tcpConn.SetNoDelay(false) } - return d.enc.resetWriter(d.conn) + d.enc, err = newEncoder(conn, d.tcpTimeout) + return err } // Connect connects to the dnstap endpoint. func (d *dio) Connect() { - if err := d.newConnect(); err != nil { - log.Error("No connection to dnstap endpoint") + if err := d.dial(); err != nil { + log.Errorf("No connection to dnstap endpoint: %s", err) } go d.serve() } @@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) { } } -func (d *dio) closeConnection() { - d.enc.close() - if d.conn != nil { - d.conn.Close() - d.conn = nil - } -} - // Close waits until the I/O routine is finished to return. func (d *dio) Close() { close(d.quit) } -func (d *dio) flushBuffer() { - if d.conn == nil { - if err := d.newConnect(); err != nil { - return - } - log.Info("Reconnected to dnstap") +func (d *dio) write(payload *tap.Dnstap) error { + if d.enc == nil { + atomic.AddUint32(&d.dropped, 1) + return nil } - - if err := d.enc.flushBuffer(); err != nil { - log.Warningf("Connection lost: %s", err) - d.closeConnection() - if err := d.newConnect(); err != nil { - log.Errorf("Cannot connect to dnstap: %s", err) - } else { - log.Info("Reconnected to dnstap") - } - } -} - -func (d *dio) write(payload *tap.Dnstap) { if err := d.enc.writeMsg(payload); err != nil { atomic.AddUint32(&d.dropped, 1) + return err } + return nil } func (d *dio) serve() { - timeout := time.After(flushTimeout) + timeout := time.After(d.flushTimeout) for { select { case <-d.quit: - d.flushBuffer() - d.closeConnection() + if d.enc == nil { + return + } + d.enc.flush() + d.enc.close() return case payload := <-d.queue: - d.write(&payload) + if err := d.write(&payload); err != nil { + d.dial() + } case <-timeout: if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 { log.Warningf("Dropped dnstap messages: %d", dropped) } - d.flushBuffer() - timeout = time.After(flushTimeout) + if d.enc == nil { + d.dial() + } else { + d.enc.flush() + } + timeout = time.After(d.flushTimeout) } } } diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go index f26f50095..6870734e4 100644 --- a/plugin/dnstap/dnstapio/io_test.go +++ b/plugin/dnstap/dnstapio/io_test.go @@ -12,11 +12,6 @@ import ( fs "github.com/farsightsec/golang-framestream" ) -const ( - endpointTCP = "localhost:0" - endpointSocket = "dnstap.sock" -) - var ( msgType = tap.Dnstap_MESSAGE msg = tap.Dnstap{Type: &msgType} @@ -27,7 +22,6 @@ func accept(t *testing.T, l net.Listener, count int) { if err != nil { t.Fatalf("Server accepted: %s", err) } - dec, err := fs.NewDecoder(server, &fs.DecoderOptions{ ContentType: []byte("protobuf:dnstap.Dnstap"), Bidirectional: true, @@ -48,9 +42,10 @@ func accept(t *testing.T, l net.Listener, count int) { } func TestTransport(t *testing.T) { - transport := [2][3]string{ - {"tcp", endpointTCP, "false"}, - {"unix", endpointSocket, "true"}, + + transport := [2][2]string{ + {"tcp", ":0"}, + {"unix", "dnstap.sock"}, } for _, param := range transport { @@ -67,7 +62,9 @@ func TestTransport(t *testing.T) { wg.Done() }() - dio := New(l.Addr().String(), param[2] == "true") + dio := New(param[0], l.Addr().String()) + dio.tcpTimeout = 10 * time.Millisecond + dio.flushTimeout = 30 * time.Millisecond dio.Connect() dio.Dnstap(msg) @@ -81,8 +78,7 @@ func TestTransport(t *testing.T) { func TestRace(t *testing.T) { count := 10 - // Start TCP listener - l, err := reuseport.Listen("tcp", endpointTCP) + l, err := reuseport.Listen("tcp", ":0") if err != nil { t.Fatalf("Cannot start listener: %s", err) } @@ -95,27 +91,27 @@ func TestRace(t *testing.T) { wg.Done() }() - dio := New(l.Addr().String(), false) + dio := New("tcp", l.Addr().String()) + dio.tcpTimeout = 10 * time.Millisecond + dio.flushTimeout = 30 * time.Millisecond dio.Connect() defer dio.Close() wg.Add(count) for i := 0; i < count; i++ { go func() { - time.Sleep(50 * time.Millisecond) - dio.Dnstap(msg) + msg := tap.Dnstap_MESSAGE + dio.Dnstap(tap.Dnstap{Type: &msg}) wg.Done() }() } - wg.Wait() } func TestReconnect(t *testing.T) { count := 5 - // Start TCP listener - l, err := reuseport.Listen("tcp", endpointTCP) + l, err := reuseport.Listen("tcp", ":0") if err != nil { t.Fatalf("Cannot start listener: %s", err) } @@ -128,18 +124,18 @@ func TestReconnect(t *testing.T) { }() addr := l.Addr().String() - dio := New(addr, false) + dio := New("tcp", addr) + dio.tcpTimeout = 10 * time.Millisecond + dio.flushTimeout = 30 * time.Millisecond dio.Connect() defer dio.Close() - msg := tap.Dnstap_MESSAGE - dio.Dnstap(tap.Dnstap{Type: &msg}) + dio.Dnstap(msg) wg.Wait() // Close listener l.Close() - // And start TCP listener again on the same port l, err = reuseport.Listen("tcp", addr) if err != nil { @@ -154,9 +150,8 @@ func TestReconnect(t *testing.T) { }() for i := 0; i < count; i++ { - time.Sleep(time.Second) - dio.Dnstap(tap.Dnstap{Type: &msg}) + time.Sleep(100 * time.Millisecond) + dio.Dnstap(msg) } - wg.Wait() } diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go index ab5488686..a863639ad 100644 --- a/plugin/dnstap/setup.go +++ b/plugin/dnstap/setup.go @@ -13,8 +13,8 @@ import ( func init() { plugin.Register("dnstap", setup) } type config struct { + proto string target string - socket bool full bool } @@ -32,10 +32,10 @@ func parseConfig(d *caddy.Controller) (c config, err error) { return c, d.ArgErr() } c.target = servers[0] + c.proto = "tcp" } else { - // default to UNIX socket c.target = strings.TrimPrefix(c.target, "unix://") - c.socket = true + c.proto = "unix" } c.full = d.NextArg() && d.Val() == "full" @@ -49,7 +49,7 @@ func setup(c *caddy.Controller) error { return plugin.Error("dnstap", err) } - dio := dnstapio.New(conf.target, conf.socket) + dio := dnstapio.New(conf.proto, conf.target) dnstap := Dnstap{io: dio, IncludeRawMessage: conf.full} c.OnStartup(func() error { diff --git a/plugin/dnstap/setup_test.go b/plugin/dnstap/setup_test.go index 8fad9cd39..129107efd 100644 --- a/plugin/dnstap/setup_test.go +++ b/plugin/dnstap/setup_test.go @@ -8,16 +8,16 @@ import ( func TestConfig(t *testing.T) { tests := []struct { - file string - path string - full bool - socket bool - fail bool + file string + path string + full bool + proto string + fail bool }{ - {"dnstap dnstap.sock full", "dnstap.sock", true, true, false}, - {"dnstap unix://dnstap.sock", "dnstap.sock", false, true, false}, - {"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, false, false}, - {"dnstap", "fail", false, true, true}, + {"dnstap dnstap.sock full", "dnstap.sock", true, "unix", false}, + {"dnstap unix://dnstap.sock", "dnstap.sock", false, "unix", false}, + {"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, "tcp", false}, + {"dnstap", "fail", false, "tcp", true}, } for _, c := range tests { cad := caddy.NewTestController("dns", c.file) @@ -26,7 +26,7 @@ func TestConfig(t *testing.T) { if err == nil { t.Errorf("%s: %s", c.file, err) } - } else if err != nil || conf.target != c.path || conf.full != c.full || conf.socket != c.socket { + } else if err != nil || conf.target != c.path || conf.full != c.full || conf.proto != c.proto { t.Errorf("Expected: %+v\nhave: %+v\nerror: %s", c, conf, err) } }