coredns/plugin/dnstap/dnstapio/io.go
Miek Gieben 12b2ff9740
Use logging (#1718)
* update docs

* plugins: use plugin specific logging

Hooking up pkg/log also changed NewWithPlugin to just take a string
instead of a plugin.Handler as that is more flexible and for instance
the Root "plugin" doesn't implement it fully.

Same logging from the reload plugin:

.:1043
2018/04/22 08:56:37 [INFO] CoreDNS-1.1.1
2018/04/22 08:56:37 [INFO] linux/amd64, go1.10.1,
CoreDNS-1.1.1
linux/amd64, go1.10.1,
2018/04/22 08:56:37 [INFO] plugin/reload: Running configuration MD5 = ec4c9c55cd19759ea1c46b8c45742b06
2018/04/22 08:56:54 [INFO] Reloading
2018/04/22 08:56:54 [INFO] plugin/reload: Running configuration MD5 = 9e2bfdd85bdc9cceb740ba9c80f34c1a
2018/04/22 08:56:54 [INFO] Reloading complete

* update docs

* better doc
2018-04-22 21:40:33 +01:00

146 lines
3 KiB
Go

package dnstapio
import (
"net"
"sync/atomic"
"time"
clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
)
var log = clog.NewWithPlugin("dnstap")
const (
tcpWriteBufSize = 1024 * 1024
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
queueSize = 10000
)
type dnstapIO struct {
endpoint string
socket bool
conn net.Conn
enc *dnstapEncoder
queue chan tap.Dnstap
dropped uint32
quit chan struct{}
}
// New returns a new and initialized DnstapIO.
func New(endpoint string, socket bool) DnstapIO {
return &dnstapIO{
endpoint: endpoint,
socket: socket,
enc: newDnstapEncoder(&fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
}),
queue: make(chan tap.Dnstap, queueSize),
quit: make(chan struct{}),
}
}
// DnstapIO interface
type DnstapIO interface {
Connect()
Dnstap(payload tap.Dnstap)
Close()
}
func (dio *dnstapIO) newConnect() error {
var err error
if dio.socket {
if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil {
return err
}
} else {
if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil {
return err
}
if tcpConn, ok := dio.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
}
return dio.enc.resetWriter(dio.conn)
}
// Connect connects to the dnstop endpoint.
func (dio *dnstapIO) Connect() {
if err := dio.newConnect(); err != nil {
log.Error("No connection to dnstap endpoint")
}
go dio.serve()
}
// Dnstap enqueues the payload for log.
func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select {
case dio.queue <- payload:
default:
atomic.AddUint32(&dio.dropped, 1)
}
}
func (dio *dnstapIO) closeConnection() {
dio.enc.close()
if dio.conn != nil {
dio.conn.Close()
dio.conn = nil
}
}
// Close waits until the I/O routine is finished to return.
func (dio *dnstapIO) Close() {
close(dio.quit)
}
func (dio *dnstapIO) flushBuffer() {
if dio.conn == nil {
if err := dio.newConnect(); err != nil {
return
}
log.Info("Reconnected to dnstap")
}
if err := dio.enc.flushBuffer(); err != nil {
log.Warningf("Connection lost: %s", err)
dio.closeConnection()
if err := dio.newConnect(); err != nil {
log.Errorf("Cannot connect to dnstap: %s", err)
} else {
log.Info("Reconnected to dnstap")
}
}
}
func (dio *dnstapIO) write(payload *tap.Dnstap) {
if err := dio.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&dio.dropped, 1)
}
}
func (dio *dnstapIO) serve() {
timeout := time.After(flushTimeout)
for {
select {
case <-dio.quit:
dio.flushBuffer()
dio.closeConnection()
return
case payload := <-dio.queue:
dio.write(&payload)
case <-timeout:
if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
dio.flushBuffer()
timeout = time.After(flushTimeout)
}
}
}