diff --git a/plugin/dnstap/dnstapio/dnstap_encoder.go b/plugin/dnstap/dnstapio/dnstap_encoder.go new file mode 100644 index 000000000..07dfc8413 --- /dev/null +++ b/plugin/dnstap/dnstapio/dnstap_encoder.go @@ -0,0 +1,92 @@ +package dnstapio + +import ( + "encoding/binary" + "fmt" + "io" + + tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +const ( + frameLenSize = 4 + protobufSize = 1024 * 1024 +) + +type dnstapEncoder struct { + fse *fs.Encoder + opts *fs.EncoderOptions + writer io.Writer + buffer *proto.Buffer +} + +func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder { + return &dnstapEncoder{ + opts: o, + buffer: proto.NewBuffer(make([]byte, 0, protobufSize)), + } +} + +func (enc *dnstapEncoder) resetWriter(w io.Writer) error { + fse, err := fs.NewEncoder(w, enc.opts) + if err != nil { + return err + } + if err = fse.Flush(); err != nil { + return err + } + enc.fse = fse + enc.writer = w + return nil +} + +func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error { + if len(enc.buffer.Bytes()) >= protobufSize { + if err := enc.flushBuffer(); err != nil { + return err + } + } + bufLen := len(enc.buffer.Bytes()) + // add placeholder for frame length + if err := enc.buffer.EncodeFixed32(0); err != nil { + enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) + return err + } + if err := enc.buffer.Marshal(msg); err != nil { + enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) + return err + } + enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:]) + return nil +} + +func (enc *dnstapEncoder) flushBuffer() error { + if enc.fse == nil || enc.writer == nil { + return fmt.Errorf("no writer") + } + + buf := enc.buffer.Bytes() + written := 0 + for written < len(buf) { + n, err := enc.writer.Write(buf[written:]) + written += n + if err != nil { + return err + } + } + enc.buffer.Reset() + return nil +} + +func (enc *dnstapEncoder) encodeFrameLen(buf []byte) { + binary.BigEndian.PutUint32(buf, uint32(len(buf)-4)) +} + +func (enc *dnstapEncoder) close() error { + if enc.fse != nil { + return enc.fse.Close() + } + return nil +} diff --git a/plugin/dnstap/dnstapio/dnstap_encoder_test.go b/plugin/dnstap/dnstapio/dnstap_encoder_test.go new file mode 100644 index 000000000..7ddb08776 --- /dev/null +++ b/plugin/dnstap/dnstapio/dnstap_encoder_test.go @@ -0,0 +1,53 @@ +package dnstapio + +import ( + "bytes" + "testing" + + tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +func dnstapMsg() *tap.Dnstap { + t := tap.Dnstap_MESSAGE + mt := tap.Message_CLIENT_RESPONSE + msg := &tap.Message{Type: &mt} + return &tap.Dnstap{Type: &t, Message: msg} +} + +func TestEncoderCompatibility(t *testing.T) { + opts := &fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.DnstapTest"), + Bidirectional: false, + } + msg := dnstapMsg() + + //framestream encoder + fsW := new(bytes.Buffer) + fsEnc, err := fs.NewEncoder(fsW, opts) + if err != nil { + t.Fatal(err) + } + data, err := proto.Marshal(msg) + if err != nil { + t.Fatal(err) + } + fsEnc.Write(data) + fsEnc.Close() + + //dnstap encoder + dnstapW := new(bytes.Buffer) + dnstapEnc := newDnstapEncoder(opts) + if err := dnstapEnc.resetWriter(dnstapW); err != nil { + t.Fatal(err) + } + dnstapEnc.writeMsg(msg) + dnstapEnc.flushBuffer() + dnstapEnc.close() + + //compare results + if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) { + t.Fatal("dnstapEncoder is not compatible with framestream Encoder") + } +} diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index 08d33aa0c..d2f54679b 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -3,25 +3,27 @@ package dnstapio import ( "log" "net" + "sync/atomic" "time" tap "github.com/dnstap/golang-dnstap" fs "github.com/farsightsec/golang-framestream" - "github.com/golang/protobuf/proto" ) const ( - tcpTimeout = 4 * time.Second - flushTimeout = 1 * time.Second - queueSize = 10000 + tcpWriteBufSize = 1024 * 1024 + tcpTimeout = 4 * time.Second + flushTimeout = 1 * time.Second + queueSize = 10000 ) type dnstapIO struct { endpoint string socket bool conn net.Conn - enc *fs.Encoder + enc *dnstapEncoder queue chan tap.Dnstap + dropped uint32 } // New returns a new and initialized DnstapIO. @@ -29,7 +31,11 @@ func New(endpoint string, socket bool) DnstapIO { return &dnstapIO{ endpoint: endpoint, socket: socket, - queue: make(chan tap.Dnstap, queueSize), + enc: newDnstapEncoder(&fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + }), + queue: make(chan tap.Dnstap, queueSize), } } @@ -43,18 +49,20 @@ type DnstapIO interface { func (dio *dnstapIO) newConnect() error { var err error if dio.socket { - dio.conn, err = net.Dial("unix", dio.endpoint) + if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil { + return err + } } else { - dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout) + if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil { + return err + } + if tcpConn, ok := dio.conn.(*net.TCPConn); ok { + tcpConn.SetWriteBuffer(tcpWriteBufSize) + tcpConn.SetNoDelay(false) + } } - if err != nil { - return err - } - dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }) - if err != nil { + + if err = dio.enc.resetWriter(dio.conn); err != nil { return err } return nil @@ -73,15 +81,16 @@ func (dio *dnstapIO) Dnstap(payload tap.Dnstap) { select { case dio.queue <- payload: default: - log.Printf("[ERROR] Dnstap payload dropped") + atomic.AddUint32(&dio.dropped, 1) } } func (dio *dnstapIO) closeConnection() { - dio.enc.Close() - dio.conn.Close() - dio.enc = nil - dio.conn = nil + dio.enc.close() + if dio.conn != nil { + dio.conn.Close() + dio.conn = nil + } } // Close waits until the I/O routine is finished to return. @@ -89,32 +98,28 @@ func (dio *dnstapIO) Close() { close(dio.queue) } -func (dio *dnstapIO) write(payload *tap.Dnstap) { - if dio.enc == nil { +func (dio *dnstapIO) flushBuffer() { + if dio.conn == nil { if err := dio.newConnect(); err != nil { return } + log.Printf("[INFO] Reconnected to dnstap") } - var err error - if payload != nil { - frame, e := proto.Marshal(payload) - if err != nil { - log.Printf("[ERROR] Invalid dnstap payload dropped: %s", e) - return + + if err := dio.enc.flushBuffer(); err != nil { + log.Printf("[WARN] Connection lost: %s", err) + dio.closeConnection() + if err := dio.newConnect(); err != nil { + log.Printf("[ERROR] Cannot connect to dnstap: %s", err) + } else { + log.Printf("[INFO] Reconnected to dnstap") } - _, err = dio.enc.Write(frame) - } else { - err = dio.enc.Flush() } - if err == nil { - return - } - log.Printf("[WARN] Connection lost: %s", err) - dio.closeConnection() - if err := dio.newConnect(); err != nil { - log.Printf("[ERROR] Cannot write dnstap payload: %s", err) - } else { - log.Printf("[INFO] Reconnect to dnstap done") +} + +func (dio *dnstapIO) write(payload *tap.Dnstap) { + if err := dio.enc.writeMsg(payload); err != nil { + atomic.AddUint32(&dio.dropped, 1) } } @@ -124,12 +129,16 @@ func (dio *dnstapIO) serve() { select { case payload, ok := <-dio.queue: if !ok { + dio.flushBuffer() dio.closeConnection() return } dio.write(&payload) case <-timeout: - dio.write(nil) + if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 { + log.Printf("[WARN] Dropped dnstap messages: %d", dropped) + } + dio.flushBuffer() timeout = time.After(flushTimeout) } }