feature: log queue and buffer memory size configuration (#6591)

* feature: log queue and buffer memory size configuration

Signed-off-by: chenyuliang5 <chenyuliang@jd.com>
This commit is contained in:
chenylh 2024-10-01 23:49:59 +08:00 committed by GitHub
parent 6efa95ca98
commit ae0b79313b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 86 additions and 56 deletions

View file

@ -15,7 +15,7 @@ Every message is sent to the socket as soon as it comes in, the *dnstap* plugin
## Syntax ## Syntax
~~~ txt ~~~ txt
dnstap SOCKET [full] { dnstap SOCKET [full] [writebuffer] [queue] {
[identity IDENTITY] [identity IDENTITY]
[version VERSION] [version VERSION]
[extra EXTRA] [extra EXTRA]
@ -38,6 +38,12 @@ Log information about client requests and responses to */tmp/dnstap.sock*.
dnstap /tmp/dnstap.sock dnstap /tmp/dnstap.sock
~~~ ~~~
Log information about client requests and responses and tcp write buffer is 1024*Mb and queue is 2048*10000.
~~~ txt
dnstap /tmp/dnstap.sock full 1024 2048
~~~
Log information including the wire-format DNS message about client requests and responses to */tmp/dnstap.sock*. Log information including the wire-format DNS message about client requests and responses to */tmp/dnstap.sock*.
~~~ txt ~~~ txt

View file

@ -20,10 +20,12 @@ type Dnstap struct {
repl replacer.Replacer repl replacer.Replacer
// IncludeRawMessage will include the raw DNS message into the dnstap messages if true. // IncludeRawMessage will include the raw DNS message into the dnstap messages if true.
IncludeRawMessage bool IncludeRawMessage bool
Identity []byte Identity []byte
Version []byte Version []byte
ExtraFormat string ExtraFormat string
MultipleTcpWriteBuf int // *Mb
MultipleQueue int // *10000
} }
// TapMessage sends the message m to the dnstap interface, without populating "Extra" field. // TapMessage sends the message m to the dnstap interface, without populating "Extra" field.

View file

@ -26,27 +26,29 @@ type tapper interface {
// dio implements the Tapper interface. // dio implements the Tapper interface.
type dio struct { type dio struct {
endpoint string endpoint string
proto string proto string
enc *encoder enc *encoder
queue chan *tap.Dnstap queue chan *tap.Dnstap
dropped uint32 dropped uint32
quit chan struct{} quit chan struct{}
flushTimeout time.Duration flushTimeout time.Duration
tcpTimeout time.Duration tcpTimeout time.Duration
skipVerify bool skipVerify bool
tcpWriteBufSize int
} }
// newIO returns a new and initialized pointer to a dio. // newIO returns a new and initialized pointer to a dio.
func newIO(proto, endpoint string) *dio { func newIO(proto, endpoint string, multipleQueue int, multipleTcpWriteBuf int) *dio {
return &dio{ return &dio{
endpoint: endpoint, endpoint: endpoint,
proto: proto, proto: proto,
queue: make(chan *tap.Dnstap, queueSize), queue: make(chan *tap.Dnstap, multipleQueue*queueSize),
quit: make(chan struct{}), quit: make(chan struct{}),
flushTimeout: flushTimeout, flushTimeout: flushTimeout,
tcpTimeout: tcpTimeout, tcpTimeout: tcpTimeout,
skipVerify: skipVerify, skipVerify: skipVerify,
tcpWriteBufSize: multipleTcpWriteBuf * tcpWriteBufSize,
} }
} }
@ -73,7 +75,7 @@ func (d *dio) dial() error {
} }
if tcpConn, ok := conn.(*net.TCPConn); ok { if tcpConn, ok := conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize) tcpConn.SetWriteBuffer(d.tcpWriteBufSize)
tcpConn.SetNoDelay(false) tcpConn.SetNoDelay(false)
} }

View file

@ -60,7 +60,7 @@ func TestTransport(t *testing.T) {
wg.Done() wg.Done()
}() }()
dio := newIO(param[0], l.Addr().String()) dio := newIO(param[0], l.Addr().String(), 1, 1)
dio.tcpTimeout = 10 * time.Millisecond dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond
dio.connect() dio.connect()
@ -89,7 +89,7 @@ func TestRace(t *testing.T) {
wg.Done() wg.Done()
}() }()
dio := newIO("tcp", l.Addr().String()) dio := newIO("tcp", l.Addr().String(), 1, 1)
dio.tcpTimeout = 10 * time.Millisecond dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond
dio.connect() dio.connect()
@ -122,7 +122,7 @@ func TestReconnect(t *testing.T) {
}() }()
addr := l.Addr().String() addr := l.Addr().String()
dio := newIO("tcp", addr) dio := newIO("tcp", addr, 1, 1)
dio.tcpTimeout = 10 * time.Millisecond dio.tcpTimeout = 10 * time.Millisecond
dio.flushTimeout = 30 * time.Millisecond dio.flushTimeout = 30 * time.Millisecond
dio.connect() dio.connect()

View file

@ -3,6 +3,7 @@ package dnstap
import ( import (
"net/url" "net/url"
"os" "os"
"strconv"
"strings" "strings"
"github.com/coredns/caddy" "github.com/coredns/caddy"
@ -20,7 +21,10 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
dnstaps := []*Dnstap{} dnstaps := []*Dnstap{}
for c.Next() { // directive name for c.Next() { // directive name
d := Dnstap{} d := Dnstap{
MultipleTcpWriteBuf: 1,
MultipleQueue: 1,
}
endpoint := "" endpoint := ""
d.repl = replacer.New() d.repl = replacer.New()
@ -32,6 +36,14 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
endpoint = args[0] endpoint = args[0]
if len(args) >= 3 {
d.MultipleTcpWriteBuf, _ = strconv.Atoi(args[2])
}
if len(args) >= 4 {
d.MultipleQueue, _ = strconv.Atoi(args[3])
}
var dio *dio var dio *dio
if strings.HasPrefix(endpoint, "tls://") { if strings.HasPrefix(endpoint, "tls://") {
// remote network endpoint // remote network endpoint
@ -39,23 +51,23 @@ func parseConfig(c *caddy.Controller) ([]*Dnstap, error) {
if err != nil { if err != nil {
return nil, c.ArgErr() return nil, c.ArgErr()
} }
dio = newIO("tls", endpointURL.Host) dio = newIO("tls", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
d = Dnstap{io: dio} d.io = dio
} else if strings.HasPrefix(endpoint, "tcp://") { } else if strings.HasPrefix(endpoint, "tcp://") {
// remote network endpoint // remote network endpoint
endpointURL, err := url.Parse(endpoint) endpointURL, err := url.Parse(endpoint)
if err != nil { if err != nil {
return nil, c.ArgErr() return nil, c.ArgErr()
} }
dio = newIO("tcp", endpointURL.Host) dio = newIO("tcp", endpointURL.Host, d.MultipleQueue, d.MultipleTcpWriteBuf)
d = Dnstap{io: dio} d.io = dio
} else { } else {
endpoint = strings.TrimPrefix(endpoint, "unix://") endpoint = strings.TrimPrefix(endpoint, "unix://")
dio = newIO("unix", endpoint) dio = newIO("unix", endpoint, d.MultipleQueue, d.MultipleTcpWriteBuf)
d = Dnstap{io: dio} d.io = dio
} }
d.IncludeRawMessage = len(args) == 2 && args[1] == "full" d.IncludeRawMessage = len(args) >= 2 && args[1] == "full"
hostname, _ := os.Hostname() hostname, _ := os.Hostname()
d.Identity = []byte(hostname) d.Identity = []byte(hostname)

View file

@ -10,12 +10,14 @@ import (
) )
type results struct { type results struct {
endpoint string endpoint string
full bool full bool
proto string proto string
identity []byte identity []byte
version []byte version []byte
extraFormat string extraFormat string
multipleTcpWriteBuf int
multipleQueue int
} }
func TestConfig(t *testing.T) { func TestConfig(t *testing.T) {
@ -25,16 +27,16 @@ func TestConfig(t *testing.T) {
fail bool fail bool
expect []results expect []results
}{ }{
{"dnstap dnstap.sock full", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), ""}}}, {"dnstap dnstap.sock full", false, []results{{"dnstap.sock", true, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap unix://dnstap.sock", false, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, {"dnstap unix://dnstap.sock", false, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap tcp://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, {"dnstap tcp://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap tcp://[::1]:6000", false, []results{{"[::1]:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, {"dnstap tcp://[::1]:6000", false, []results{{"[::1]:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap tcp://example.com:6000", false, []results{{"example.com:6000", false, "tcp", []byte(hostname), []byte("-"), ""}}}, {"dnstap tcp://example.com:6000", false, []results{{"example.com:6000", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap", true, []results{{"fail", false, "tcp", []byte(hostname), []byte("-"), ""}}}, {"dnstap", true, []results{{"fail", false, "tcp", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), ""}}}, {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "", 1, 1}}},
{"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}}}, {"dnstap dnstap.sock full {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
{"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", false, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}}}, {"dnstap dnstap.sock {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", false, []results{{"dnstap.sock", false, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
{"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{"fail", false, "tcp", []byte("NAME"), []byte("VER"), "EXTRA"}}}, {"dnstap {\nidentity NAME\nversion VER\nextra EXTRA\n}\n", true, []results{{"fail", false, "tcp", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1}}},
{`dnstap dnstap.sock full { {`dnstap dnstap.sock full {
identity NAME identity NAME
version VER version VER
@ -45,13 +47,13 @@ func TestConfig(t *testing.T) {
version VER2 version VER2
extra EXTRA2 extra EXTRA2
}`, false, []results{ }`, false, []results{
{"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA"}, {"dnstap.sock", true, "unix", []byte("NAME"), []byte("VER"), "EXTRA", 1, 1},
{"127.0.0.1:6000", false, "tcp", []byte("NAME2"), []byte("VER2"), "EXTRA2"}, {"127.0.0.1:6000", false, "tcp", []byte("NAME2"), []byte("VER2"), "EXTRA2", 1, 1},
}}, }},
{"dnstap tls://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tls", []byte(hostname), []byte("-"), ""}}}, {"dnstap tls://127.0.0.1:6000", false, []results{{"127.0.0.1:6000", false, "tls", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, {"dnstap dnstap.sock {\nidentity\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap dnstap.sock {\nversion\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, {"dnstap dnstap.sock {\nversion\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
{"dnstap dnstap.sock {\nextra\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), ""}}}, {"dnstap dnstap.sock {\nextra\n}\n", true, []results{{"dnstap.sock", false, "unix", []byte(hostname), []byte("-"), "", 1, 1}}},
} }
for i, tc := range tests { for i, tc := range tests {
c := caddy.NewTestController("dns", tc.in) c := caddy.NewTestController("dns", tc.in)
@ -82,6 +84,12 @@ func TestConfig(t *testing.T) {
if x := string(tap.Version); x != string(tc.expect[i].version) { if x := string(tap.Version); x != string(tc.expect[i].version) {
t.Errorf("Test %d: expected version %s, got %s", i, tc.expect[i].version, x) t.Errorf("Test %d: expected version %s, got %s", i, tc.expect[i].version, x)
} }
if x := tap.MultipleTcpWriteBuf; x != tc.expect[i].multipleTcpWriteBuf {
t.Errorf("Test %d: expected MultipleTcpWriteBuf %d, got %d", i, tc.expect[i].multipleTcpWriteBuf, x)
}
if x := tap.MultipleQueue; x != tc.expect[i].multipleQueue {
t.Errorf("Test %d: expected MultipleQueue %d, got %d", i, tc.expect[i].multipleQueue, x)
}
if x := tap.ExtraFormat; x != tc.expect[i].extraFormat { if x := tap.ExtraFormat; x != tc.expect[i].extraFormat {
t.Errorf("Test %d: expected extra format %s, got %s", i, tc.expect[i].extraFormat, x) t.Errorf("Test %d: expected extra format %s, got %s", i, tc.expect[i].extraFormat, x)
} }