From 45ef657d3614e5194cc906fba3d28fe00fa18ba4 Mon Sep 17 00:00:00 2001 From: Ruslan Drozhdzh <30860269+rdrozhdzh@users.noreply.github.com> Date: Wed, 6 Dec 2017 13:36:04 +0300 Subject: [PATCH] Increase performance of Dnstap plugin (#1280) - added dnstapEncoder object which incapsulates marshalling of dnstap messages to protobuf and writing data to connection - dnstapEncoder writes data directly to connection object. It doesn't use the framestream's "write" method, because it writes data to intermediate buffer (bufio.Writer) which leads to unnecessary data copying and drops the performance - dnstapEncoder reuses a preallocated buffer for marshalling dnstap messages. Many messages are added to the same buffer. They are separated with a "frame length" 4-byte values, so the buffer content is writen to connection object in the format compatible with framestream library - added test which guarantees that dnstapEncoder output is the same as framestream Encoder output - the performance increase is about 50% in (dio *dnstapIO) serve() method of dnstap plugin. The overall coredns performance increase is about 10% in the following configuration: .:1053 { erratic { drop 0 truncate 0 delay 0 } dnstap tcp://127.0.0.1:6000 full errors stdout } tested with dnsperf tool --- plugin/dnstap/dnstapio/dnstap_encoder.go | 92 ++++++++++++++++++ plugin/dnstap/dnstapio/dnstap_encoder_test.go | 53 +++++++++++ plugin/dnstap/dnstapio/io.go | 93 ++++++++++--------- 3 files changed, 196 insertions(+), 42 deletions(-) create mode 100644 plugin/dnstap/dnstapio/dnstap_encoder.go create mode 100644 plugin/dnstap/dnstapio/dnstap_encoder_test.go diff --git a/plugin/dnstap/dnstapio/dnstap_encoder.go b/plugin/dnstap/dnstapio/dnstap_encoder.go new file mode 100644 index 000000000..07dfc8413 --- /dev/null +++ b/plugin/dnstap/dnstapio/dnstap_encoder.go @@ -0,0 +1,92 @@ +package dnstapio + +import ( + "encoding/binary" + "fmt" + "io" + + tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +const ( + frameLenSize = 4 + protobufSize = 1024 * 1024 +) + +type dnstapEncoder struct { + fse *fs.Encoder + opts *fs.EncoderOptions + writer io.Writer + buffer *proto.Buffer +} + +func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder { + return &dnstapEncoder{ + opts: o, + buffer: proto.NewBuffer(make([]byte, 0, protobufSize)), + } +} + +func (enc *dnstapEncoder) resetWriter(w io.Writer) error { + fse, err := fs.NewEncoder(w, enc.opts) + if err != nil { + return err + } + if err = fse.Flush(); err != nil { + return err + } + enc.fse = fse + enc.writer = w + return nil +} + +func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error { + if len(enc.buffer.Bytes()) >= protobufSize { + if err := enc.flushBuffer(); err != nil { + return err + } + } + bufLen := len(enc.buffer.Bytes()) + // add placeholder for frame length + if err := enc.buffer.EncodeFixed32(0); err != nil { + enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) + return err + } + if err := enc.buffer.Marshal(msg); err != nil { + enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen]) + return err + } + enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:]) + return nil +} + +func (enc *dnstapEncoder) flushBuffer() error { + if enc.fse == nil || enc.writer == nil { + return fmt.Errorf("no writer") + } + + buf := enc.buffer.Bytes() + written := 0 + for written < len(buf) { + n, err := enc.writer.Write(buf[written:]) + written += n + if err != nil { + return err + } + } + enc.buffer.Reset() + return nil +} + +func (enc *dnstapEncoder) encodeFrameLen(buf []byte) { + binary.BigEndian.PutUint32(buf, uint32(len(buf)-4)) +} + +func (enc *dnstapEncoder) close() error { + if enc.fse != nil { + return enc.fse.Close() + } + return nil +} diff --git a/plugin/dnstap/dnstapio/dnstap_encoder_test.go b/plugin/dnstap/dnstapio/dnstap_encoder_test.go new file mode 100644 index 000000000..7ddb08776 --- /dev/null +++ b/plugin/dnstap/dnstapio/dnstap_encoder_test.go @@ -0,0 +1,53 @@ +package dnstapio + +import ( + "bytes" + "testing" + + tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +func dnstapMsg() *tap.Dnstap { + t := tap.Dnstap_MESSAGE + mt := tap.Message_CLIENT_RESPONSE + msg := &tap.Message{Type: &mt} + return &tap.Dnstap{Type: &t, Message: msg} +} + +func TestEncoderCompatibility(t *testing.T) { + opts := &fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.DnstapTest"), + Bidirectional: false, + } + msg := dnstapMsg() + + //framestream encoder + fsW := new(bytes.Buffer) + fsEnc, err := fs.NewEncoder(fsW, opts) + if err != nil { + t.Fatal(err) + } + data, err := proto.Marshal(msg) + if err != nil { + t.Fatal(err) + } + fsEnc.Write(data) + fsEnc.Close() + + //dnstap encoder + dnstapW := new(bytes.Buffer) + dnstapEnc := newDnstapEncoder(opts) + if err := dnstapEnc.resetWriter(dnstapW); err != nil { + t.Fatal(err) + } + dnstapEnc.writeMsg(msg) + dnstapEnc.flushBuffer() + dnstapEnc.close() + + //compare results + if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) { + t.Fatal("dnstapEncoder is not compatible with framestream Encoder") + } +} diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index 08d33aa0c..d2f54679b 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -3,25 +3,27 @@ package dnstapio import ( "log" "net" + "sync/atomic" "time" tap "github.com/dnstap/golang-dnstap" fs "github.com/farsightsec/golang-framestream" - "github.com/golang/protobuf/proto" ) const ( - tcpTimeout = 4 * time.Second - flushTimeout = 1 * time.Second - queueSize = 10000 + tcpWriteBufSize = 1024 * 1024 + tcpTimeout = 4 * time.Second + flushTimeout = 1 * time.Second + queueSize = 10000 ) type dnstapIO struct { endpoint string socket bool conn net.Conn - enc *fs.Encoder + enc *dnstapEncoder queue chan tap.Dnstap + dropped uint32 } // New returns a new and initialized DnstapIO. @@ -29,7 +31,11 @@ func New(endpoint string, socket bool) DnstapIO { return &dnstapIO{ endpoint: endpoint, socket: socket, - queue: make(chan tap.Dnstap, queueSize), + enc: newDnstapEncoder(&fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + }), + queue: make(chan tap.Dnstap, queueSize), } } @@ -43,18 +49,20 @@ type DnstapIO interface { func (dio *dnstapIO) newConnect() error { var err error if dio.socket { - dio.conn, err = net.Dial("unix", dio.endpoint) + if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil { + return err + } } else { - dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout) + if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil { + return err + } + if tcpConn, ok := dio.conn.(*net.TCPConn); ok { + tcpConn.SetWriteBuffer(tcpWriteBufSize) + tcpConn.SetNoDelay(false) + } } - if err != nil { - return err - } - dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }) - if err != nil { + + if err = dio.enc.resetWriter(dio.conn); err != nil { return err } return nil @@ -73,15 +81,16 @@ func (dio *dnstapIO) Dnstap(payload tap.Dnstap) { select { case dio.queue <- payload: default: - log.Printf("[ERROR] Dnstap payload dropped") + atomic.AddUint32(&dio.dropped, 1) } } func (dio *dnstapIO) closeConnection() { - dio.enc.Close() - dio.conn.Close() - dio.enc = nil - dio.conn = nil + dio.enc.close() + if dio.conn != nil { + dio.conn.Close() + dio.conn = nil + } } // Close waits until the I/O routine is finished to return. @@ -89,32 +98,28 @@ func (dio *dnstapIO) Close() { close(dio.queue) } -func (dio *dnstapIO) write(payload *tap.Dnstap) { - if dio.enc == nil { +func (dio *dnstapIO) flushBuffer() { + if dio.conn == nil { if err := dio.newConnect(); err != nil { return } + log.Printf("[INFO] Reconnected to dnstap") } - var err error - if payload != nil { - frame, e := proto.Marshal(payload) - if err != nil { - log.Printf("[ERROR] Invalid dnstap payload dropped: %s", e) - return + + if err := dio.enc.flushBuffer(); err != nil { + log.Printf("[WARN] Connection lost: %s", err) + dio.closeConnection() + if err := dio.newConnect(); err != nil { + log.Printf("[ERROR] Cannot connect to dnstap: %s", err) + } else { + log.Printf("[INFO] Reconnected to dnstap") } - _, err = dio.enc.Write(frame) - } else { - err = dio.enc.Flush() } - if err == nil { - return - } - log.Printf("[WARN] Connection lost: %s", err) - dio.closeConnection() - if err := dio.newConnect(); err != nil { - log.Printf("[ERROR] Cannot write dnstap payload: %s", err) - } else { - log.Printf("[INFO] Reconnect to dnstap done") +} + +func (dio *dnstapIO) write(payload *tap.Dnstap) { + if err := dio.enc.writeMsg(payload); err != nil { + atomic.AddUint32(&dio.dropped, 1) } } @@ -124,12 +129,16 @@ func (dio *dnstapIO) serve() { select { case payload, ok := <-dio.queue: if !ok { + dio.flushBuffer() dio.closeConnection() return } dio.write(&payload) case <-timeout: - dio.write(nil) + if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 { + log.Printf("[WARN] Dropped dnstap messages: %d", dropped) + } + dio.flushBuffer() timeout = time.After(flushTimeout) } }