diff --git a/plugin/dnstap/README.md b/plugin/dnstap/README.md index b90c45fc7..0d0919c11 100644 --- a/plugin/dnstap/README.md +++ b/plugin/dnstap/README.md @@ -15,7 +15,7 @@ Every message is sent to the socket as soon as it comes in, the *dnstap* plugin ## Syntax ~~~ txt -dnstap SOCKET [full] { +dnstap SOCKET [full] [writebuffer] [queue] { [identity IDENTITY] [version VERSION] [extra EXTRA] @@ -38,6 +38,12 @@ Log information about client requests and responses to */tmp/dnstap.sock*. dnstap /tmp/dnstap.sock ~~~ +Log information about client requests and responses and tcp write buffer is 1024*Mb and queue is 2048*10000. + +~~~ txt +dnstap /tmp/dnstap.sock full 1024 2048 +~~~ + Log information including the wire-format DNS message about client requests and responses to */tmp/dnstap.sock*. ~~~ txt diff --git a/plugin/dnstap/handler.go b/plugin/dnstap/handler.go index 1d5bd98e5..d322aab90 100644 --- a/plugin/dnstap/handler.go +++ b/plugin/dnstap/handler.go @@ -20,10 +20,12 @@ type Dnstap struct { repl replacer.Replacer // IncludeRawMessage will include the raw DNS message into the dnstap messages if true. - IncludeRawMessage bool - Identity []byte - Version []byte - ExtraFormat string + IncludeRawMessage bool + Identity []byte + Version []byte + ExtraFormat string + MultipleTcpWriteBuf int // *Mb + MultipleQueue int // *10000 } // TapMessage sends the message m to the dnstap interface, without populating "Extra" field. diff --git a/plugin/dnstap/io.go b/plugin/dnstap/io.go index f95e4b5e8..4a6af6cb5 100644 --- a/plugin/dnstap/io.go +++ b/plugin/dnstap/io.go @@ -26,27 +26,29 @@ type tapper interface { // dio implements the Tapper interface. type dio struct { - endpoint string - proto string - enc *encoder - queue chan *tap.Dnstap - dropped uint32 - quit chan struct{} - flushTimeout time.Duration - tcpTimeout time.Duration - skipVerify bool + endpoint string + proto string + enc *encoder + queue chan *tap.Dnstap + dropped uint32 + quit chan struct{} + flushTimeout time.Duration + tcpTimeout time.Duration + skipVerify bool + tcpWriteBufSize int } // newIO returns a new and initialized pointer to a dio. -func newIO(proto, endpoint string) *dio { +func newIO(proto, endpoint string, multipleQueue int, multipleTcpWriteBuf int) *dio { return &dio{ - endpoint: endpoint, - proto: proto, - queue: make(chan *tap.Dnstap, queueSize), - quit: make(chan struct{}), - flushTimeout: flushTimeout, - tcpTimeout: tcpTimeout, - skipVerify: skipVerify, + endpoint: endpoint, + proto: proto, + queue: make(chan *tap.Dnstap, multipleQueue*queueSize), + quit: make(chan struct{}), + flushTimeout: flushTimeout, + tcpTimeout: tcpTimeout, + skipVerify: skipVerify, + tcpWriteBufSize: multipleTcpWriteBuf * tcpWriteBufSize, } } @@ -73,7 +75,7 @@ func (d *dio) dial() error { } if tcpConn, ok := conn.(*net.TCPConn); ok { - tcpConn.SetWriteBuffer(tcpWriteBufSize) + tcpConn.SetWriteBuffer(d.tcpWriteBufSize) tcpConn.SetNoDelay(false) } diff --git a/plugin/dnstap/io_test.go b/plugin/dnstap/io_test.go index 3e94f0556..c9847dba5 100644 --- a/plugin/dnstap/io_test.go +++ b/plugin/dnstap/io_test.go @@ -60,7 +60,7 @@ func TestTransport(t *testing.T) { wg.Done() }() - dio := newIO(param[0], l.Addr().String()) + dio := newIO(param[0], l.Addr().String(), 1, 1) dio.tcpTimeout = 10 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond dio.connect() @@ -89,7 +89,7 @@ func TestRace(t *testing.T) { wg.Done() }() - dio := newIO("tcp", l.Addr().String()) + dio := newIO("tcp", l.Addr().String(), 1, 1) dio.tcpTimeout = 10 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond dio.connect() @@ -122,7 +122,7 @@ func TestReconnect(t *testing.T) { }() addr := l.Addr().String() - dio := newIO("tcp", addr) + dio := newIO("tcp", addr, 1, 1) dio.tcpTimeout = 10 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond dio.connect() diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go index 0186f4d51..5e6b31d3a 100644 --- a/plugin/dnstap/setup.go +++ b/plugin/dnstap/setup.go @@ -3,6 +3,7 @@ package dnstap import ( "net/url" "os" + "strconv" "strings" "github.com/coredns/caddy" @@ -20,7 +21,10 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) { dnstaps := []*Dnstap{} for c.Next() { // directive name - d := Dnstap{} + d := Dnstap{ + MultipleTcpWriteBuf: 1, + MultipleQueue: 1, + } endpoint := "" d.repl = replacer.New() @@ -32,6 +36,14 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) { endpoint = args[0] + if len(args) >= 3 { + d.MultipleTcpWriteBuf, _ = strconv.Atoi(args[2]) + } + + if len(args) >= 4 { + d.MultipleQueue, _ = strconv.Atoi(args[3]) + } + var dio *dio if strings.HasPrefix(endpoint, "tls://") { // remote network endpoint @@ -39,23 +51,23 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) { if err != nil { return nil, c.ArgErr() } - dio = newIO("tls", endpointURL.Host) - d = Dnstap{io: dio} + dio = newIO("tls", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf) + d.io = dio } else if strings.HasPrefix(endpoint, "tcp://") { // remote network endpoint endpointURL, err := url.Parse(endpoint) if err != nil { return nil, c.ArgErr() } - dio = newIO("tcp", endpointURL.Host) - d = Dnstap{io: dio} + dio = newIO("tcp", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf) + d.io = dio } else { endpoint = strings.TrimPrefix(endpoint, "unix://") - dio = newIO("unix", endpoint) - d = Dnstap{io: dio} + dio = newIO("unix", endpoint, d.MultipleQueue, d.MultipleTcpWriteBuf) + d.io = dio } - d.IncludeRawMessage = len(args) == 2 && args[1] == "full" + d.IncludeRawMessage = len(args) >= 2 && args[1] == "full" hostname, _ := os.Hostname() d.Identity = []byte(hostname) diff --git a/plugin/dnstap/setup_test.go b/plugin/dnstap/setup_test.go index 83659638d..ee0c24340 100644 --- a/plugin/dnstap/setup_test.go +++ b/plugin/dnstap/setup_test.go @@ -10,12 +10,14 @@ import ( ) type results struct { - endpoint string - full bool - proto string - identity []byte - version []byte - extraFormat string + endpoint string + full bool + proto string + identity []byte + version []byte + extraFormat string + multipleTcpWriteBuf int + multipleQueue int } func TestConfig(t *testing.T) { @@ -25,16 +27,16 @@ func TestConfig(t *testing.T) { fail bool expect []results }{ - {"dnstap dnstap.sock full", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), ""}}}, - {"dnstap unix://dnstap.sock", false, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, - {"dnstap tcp://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, - {"dnstap tcp://[::1]:6000", false, []results{{"[::1]:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, - {"dnstap tcp://example.com:6000", false, []results{{"example.com:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, - {"dnstap", true, []results{{"fail", false, "tcp", []byte(hostname), []byte("-"), ""}}}, - {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), ""}}}, - {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}}}, - {"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", false, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}}}, - {"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{"fail", false, "tcp", []byte("NAME"), []byte("VER"), "EXTRA"}}}, + {"dnstap dnstap.sock full", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap unix://dnstap.sock", false, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap tcp://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap tcp://[::1]:6000", false, []results{{"[::1]:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap tcp://example.com:6000", false, []results{{"example.com:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap", true, []results{{"fail", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "", 1, 1}}}, + {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}}, + {"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", false, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}}, + {"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{"fail", false, "tcp", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}}, {`dnstap dnstap.sock full { identity NAME version VER @@ -45,13 +47,13 @@ func TestConfig(t *testing.T) { version VER2 extra EXTRA2 }`, false, []results{ - {"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}, - {"127.0.0.1:6000", false, "tcp", []byte("NAME2"), []byte("VER2"), "EXTRA2"}, + {"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}, + {"127.0.0.1:6000", false, "tcp", []byte("NAME2"), []byte("VER2"), "EXTRA2", 1, 1}, }}, - {"dnstap tls://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tls", []byte(hostname), []byte("-"), ""}}}, - {"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, - {"dnstap dnstap.sock {\nversion\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, - {"dnstap dnstap.sock {\nextra\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, + {"dnstap tls://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tls", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap dnstap.sock {\nversion\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}}, + {"dnstap dnstap.sock {\nextra\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}}, } for i, tc := range tests { c := caddy.NewTestController("dns", tc.in) @@ -82,6 +84,12 @@ func TestConfig(t *testing.T) { if x := string(tap.Version); x != string(tc.expect[i].version) { t.Errorf("Test %d: expected version %s, got %s", i, tc.expect[i].version, x) } + if x := tap.MultipleTcpWriteBuf; x != tc.expect[i].multipleTcpWriteBuf { + t.Errorf("Test %d: expected MultipleTcpWriteBuf %d, got %d", i, tc.expect[i].multipleTcpWriteBuf, x) + } + if x := tap.MultipleQueue; x != tc.expect[i].multipleQueue { + t.Errorf("Test %d: expected MultipleQueue %d, got %d", i, tc.expect[i].multipleQueue, x) + } if x := tap.ExtraFormat; x != tc.expect[i].extraFormat { t.Errorf("Test %d: expected extra format %s, got %s", i, tc.expect[i].extraFormat, x) }