From ea293da1d6bb0c4f598c8790c6941d56c79c0aa3 Mon Sep 17 00:00:00 2001 From: Pat Downey Date: Tue, 4 Jul 2023 15:35:55 +0100 Subject: [PATCH] Fix forward metrics for backwards compatibility (#6178) --- plugin/forward/README.md | 26 +++++++++++++++--------- plugin/forward/forward.go | 5 +++-- plugin/forward/forward_test.go | 12 +++++------ plugin/forward/health_test.go | 14 ++++++------- plugin/forward/metrics.go | 5 +++-- plugin/forward/setup.go | 6 ++++-- plugin/pkg/proxy/connect.go | 8 +++----- plugin/pkg/proxy/health.go | 7 +++++-- plugin/pkg/proxy/health_test.go | 20 +++++++++---------- plugin/pkg/proxy/metrics.go | 31 ++++++++++------------------- plugin/pkg/proxy/persistent.go | 4 +++- plugin/pkg/proxy/persistent_test.go | 6 +++--- plugin/pkg/proxy/proxy.go | 13 +++++++----- plugin/pkg/proxy/proxy_test.go | 8 ++++---- 14 files changed, 87 insertions(+), 78 deletions(-) diff --git a/plugin/forward/README.md b/plugin/forward/README.md index ab2ef2b4b..7dd66f768 100644 --- a/plugin/forward/README.md +++ b/plugin/forward/README.md @@ -115,20 +115,28 @@ plugin is also enabled: If monitoring is enabled (via the *prometheus* plugin) then the following metric are exported: -* `coredns_forward_requests_total{to}` - query count per upstream. -* `coredns_forward_responses_total{to}` - Counter of responses received per upstream. -* `coredns_forward_request_duration_seconds{to, rcode, type}` - duration per upstream, RCODE, type -* `coredns_forward_responses_total{to, rcode}` - count of RCODEs per upstream. -* `coredns_forward_healthcheck_failures_total{to}` - number of failed health checks per upstream. -* `coredns_forward_healthcheck_broken_total{}` - counter of when all upstreams are unhealthy, +* `coredns_forward_healthcheck_broken_total{}` - count of when all upstreams are unhealthy, and we are randomly (this always uses the `random` policy) spraying to an upstream. -* `coredns_forward_max_concurrent_rejects_total{}` - counter of the number of queries rejected because the +* `coredns_forward_max_concurrent_rejects_total{}` - count of queries rejected because the number of concurrent queries were at maximum. -* `coredns_forward_conn_cache_hits_total{to, proto}` - counter of connection cache hits per upstream and protocol. -* `coredns_forward_conn_cache_misses_total{to, proto}` - counter of connection cache misses per upstream and protocol. +* `coredns_proxy_request_duration_seconds{proxy_name="forward", to, rcode}` - histogram per upstream, RCODE +* `coredns_proxy_healthcheck_failures_total{proxy_name="forward", to, rcode}`- count of failed health checks per upstream. +* `coredns_proxy_conn_cache_hits_total{proxy_name="forward", to, proto}`- count of connection cache hits per upstream and protocol. +* `coredns_proxy_conn_cache_misses_total{proxy_name="forward", to, proto}` - count of connection cache misses per upstream and protocol. + Where `to` is one of the upstream servers (**TO** from the config), `rcode` is the returned RCODE from the upstream, `proto` is the transport protocol like `udp`, `tcp`, `tcp-tls`. +The following metrics have recently been deprecated: +* `coredns_forward_healthcheck_failures_total{to, rcode}` + * Can be replaced with `coredns_proxy_healthcheck_failures_total{proxy_name="forward", to, rcode}` +* `coredns_forward_requests_total{to}` + * Can be replaced with `sum(coredns_proxy_request_duration_seconds_count{proxy_name="forward", to})` +* `coredns_forward_responses_total{to, rcode}` + * Can be replaced with `coredns_proxy_request_duration_seconds_count{proxy_name="forward", to, rcode}` +* `coredns_forward_request_duration_seconds{to, rcode}` + * Can be replaced with `coredns_proxy_request_duration_seconds{proxy_name="forward", to, rcode}` + ## Examples Proxy all requests within `example.org.` to a nameserver running on a different port: diff --git a/plugin/forward/forward.go b/plugin/forward/forward.go index 927a6e21f..b3df8330f 100644 --- a/plugin/forward/forward.go +++ b/plugin/forward/forward.go @@ -97,7 +97,7 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg count := atomic.AddInt64(&(f.concurrent), 1) defer atomic.AddInt64(&(f.concurrent), -1) if count > f.maxConcurrent { - MaxConcurrentRejectCount.Add(1) + maxConcurrentRejectCount.Add(1) return dns.RcodeRefused, f.ErrLimitExceeded } } @@ -129,7 +129,7 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg r := new(random) proxy = r.List(f.proxies)[0] - HealthcheckBrokenCount.Add(1) + healthcheckBrokenCount.Add(1) } if span != nil { @@ -150,6 +150,7 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg for { ret, err = proxy.Connect(ctx, state, opts) + if err == ErrCachedClosed { // Remote side closed conn, can only happen with TCP. continue } diff --git a/plugin/forward/forward_test.go b/plugin/forward/forward_test.go index 9ea859826..aca58cbf9 100644 --- a/plugin/forward/forward_test.go +++ b/plugin/forward/forward_test.go @@ -15,17 +15,17 @@ import ( func TestList(t *testing.T) { f := Forward{ proxies: []*proxy.Proxy{ - proxy.NewProxy("1.1.1.1:53", transport.DNS), - proxy.NewProxy("2.2.2.2:53", transport.DNS), - proxy.NewProxy("3.3.3.3:53", transport.DNS), + proxy.NewProxy("TestList", "1.1.1.1:53", transport.DNS), + proxy.NewProxy("TestList", "2.2.2.2:53", transport.DNS), + proxy.NewProxy("TestList", "3.3.3.3:53", transport.DNS), }, p: &roundRobin{}, } expect := []*proxy.Proxy{ - proxy.NewProxy("2.2.2.2:53", transport.DNS), - proxy.NewProxy("1.1.1.1:53", transport.DNS), - proxy.NewProxy("3.3.3.3:53", transport.DNS), + proxy.NewProxy("TestList", "2.2.2.2:53", transport.DNS), + proxy.NewProxy("TestList", "1.1.1.1:53", transport.DNS), + proxy.NewProxy("TestList", "3.3.3.3:53", transport.DNS), } got := f.List() diff --git a/plugin/forward/health_test.go b/plugin/forward/health_test.go index 7cb928d22..211a620c4 100644 --- a/plugin/forward/health_test.go +++ b/plugin/forward/health_test.go @@ -33,7 +33,7 @@ func TestHealth(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealth", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() @@ -71,7 +71,7 @@ func TestHealthTCP(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthTCP", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) p.GetHealthchecker().SetTCPTransport() @@ -110,7 +110,7 @@ func TestHealthNoRecursion(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthNoRecursion", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) p.GetHealthchecker().SetRecursionDesired(false) @@ -154,7 +154,7 @@ func TestHealthTimeout(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthTimeout", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() @@ -182,7 +182,7 @@ func TestHealthMaxFails(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthMaxFails", s.Addr, transport.DNS) p.SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) @@ -219,7 +219,7 @@ func TestHealthNoMaxFails(t *testing.T) { }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthNoMaxFails", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() @@ -258,7 +258,7 @@ func TestHealthDomain(t *testing.T) { w.WriteMsg(ret) }) defer s.Close() - p := proxy.NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy("TestHealthDomain", s.Addr, transport.DNS) p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) p.GetHealthchecker().SetDomain(hcDomain) diff --git a/plugin/forward/metrics.go b/plugin/forward/metrics.go index da0905525..246dc6500 100644 --- a/plugin/forward/metrics.go +++ b/plugin/forward/metrics.go @@ -9,13 +9,14 @@ import ( // Variables declared for monitoring. var ( - HealthcheckBrokenCount = promauto.NewCounter(prometheus.CounterOpts{ + healthcheckBrokenCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "forward", Name: "healthcheck_broken_total", Help: "Counter of the number of complete failures of the healthchecks.", }) - MaxConcurrentRejectCount = promauto.NewCounter(prometheus.CounterOpts{ + + maxConcurrentRejectCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "forward", Name: "max_concurrent_rejects_total", diff --git a/plugin/forward/setup.go b/plugin/forward/setup.go index 6de0c870f..916d7a7a5 100644 --- a/plugin/forward/setup.go +++ b/plugin/forward/setup.go @@ -19,7 +19,9 @@ import ( "github.com/miekg/dns" ) -func init() { plugin.Register("forward", setup) } +func init() { + plugin.Register("forward", setup) +} func setup(c *caddy.Controller) error { fs, err := parseForward(c) @@ -128,7 +130,7 @@ func parseStanza(c *caddy.Controller) (*Forward, error) { if !allowedTrans[trans] { return f, fmt.Errorf("'%s' is not supported as a destination protocol in forward: %s", trans, host) } - p := proxy.NewProxy(h, trans) + p := proxy.NewProxy("forward", h, trans) f.proxies = append(f.proxies, p) transports[i] = trans } diff --git a/plugin/pkg/proxy/connect.go b/plugin/pkg/proxy/connect.go index b60a1a237..d799df498 100644 --- a/plugin/pkg/proxy/connect.go +++ b/plugin/pkg/proxy/connect.go @@ -55,10 +55,10 @@ func (t *Transport) Dial(proto string) (*persistConn, bool, error) { pc := <-t.ret if pc != nil { - ConnCacheHitsCount.WithLabelValues(t.addr, proto).Add(1) + connCacheHitsCount.WithLabelValues(t.proxyName, t.addr, proto).Add(1) return pc, true, nil } - ConnCacheMissesCount.WithLabelValues(t.addr, proto).Add(1) + connCacheMissesCount.WithLabelValues(t.proxyName, t.addr, proto).Add(1) reqTime := time.Now() timeout := t.dialTimeout() @@ -152,9 +152,7 @@ func (p *Proxy) Connect(ctx context.Context, state request.Request, opts Options rc = strconv.Itoa(ret.Rcode) } - RequestCount.WithLabelValues(p.addr).Add(1) - RcodeCount.WithLabelValues(rc, p.addr).Add(1) - RequestDuration.WithLabelValues(p.addr, rc).Observe(time.Since(start).Seconds()) + requestDuration.WithLabelValues(p.proxyName, p.addr, rc).Observe(time.Since(start).Seconds()) return ret, nil } diff --git a/plugin/pkg/proxy/health.go b/plugin/pkg/proxy/health.go index a7e99560d..4b4b4cc01 100644 --- a/plugin/pkg/proxy/health.go +++ b/plugin/pkg/proxy/health.go @@ -32,10 +32,12 @@ type dnsHc struct { c *dns.Client recursionDesired bool domain string + + proxyName string } // NewHealthChecker returns a new HealthChecker based on transport. -func NewHealthChecker(trans string, recursionDesired bool, domain string) HealthChecker { +func NewHealthChecker(proxyName, trans string, recursionDesired bool, domain string) HealthChecker { switch trans { case transport.DNS, transport.TLS: c := new(dns.Client) @@ -47,6 +49,7 @@ func NewHealthChecker(trans string, recursionDesired bool, domain string) Health c: c, recursionDesired: recursionDesired, domain: domain, + proxyName: proxyName, } } @@ -104,7 +107,7 @@ func (h *dnsHc) SetWriteTimeout(t time.Duration) { func (h *dnsHc) Check(p *Proxy) error { err := h.send(p.addr) if err != nil { - HealthcheckFailureCount.WithLabelValues(p.addr).Add(1) + healthcheckFailureCount.WithLabelValues(p.proxyName, p.addr).Add(1) p.incrementFails() return err } diff --git a/plugin/pkg/proxy/health_test.go b/plugin/pkg/proxy/health_test.go index c1b5270ad..8d9acfb9c 100644 --- a/plugin/pkg/proxy/health_test.go +++ b/plugin/pkg/proxy/health_test.go @@ -23,11 +23,11 @@ func TestHealth(t *testing.T) { }) defer s.Close() - hc := NewHealthChecker(transport.DNS, true, "") + hc := NewHealthChecker("TestHealth", transport.DNS, true, "") hc.SetReadTimeout(10 * time.Millisecond) hc.SetWriteTimeout(10 * time.Millisecond) - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestHealth", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond err := hc.Check(p) if err != nil { @@ -53,12 +53,12 @@ func TestHealthTCP(t *testing.T) { }) defer s.Close() - hc := NewHealthChecker(transport.DNS, true, "") + hc := NewHealthChecker("TestHealthTCP", transport.DNS, true, "") hc.SetTCPTransport() hc.SetReadTimeout(10 * time.Millisecond) hc.SetWriteTimeout(10 * time.Millisecond) - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestHealthTCP", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond err := hc.Check(p) if err != nil { @@ -84,11 +84,11 @@ func TestHealthNoRecursion(t *testing.T) { }) defer s.Close() - hc := NewHealthChecker(transport.DNS, false, "") + hc := NewHealthChecker("TestHealthNoRecursion", transport.DNS, false, "") hc.SetReadTimeout(10 * time.Millisecond) hc.SetWriteTimeout(10 * time.Millisecond) - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestHealthNoRecursion", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond err := hc.Check(p) if err != nil { @@ -108,11 +108,11 @@ func TestHealthTimeout(t *testing.T) { }) defer s.Close() - hc := NewHealthChecker(transport.DNS, false, "") + hc := NewHealthChecker("TestHealthTimeout", transport.DNS, false, "") hc.SetReadTimeout(10 * time.Millisecond) hc.SetWriteTimeout(10 * time.Millisecond) - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestHealthTimeout", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond err := hc.Check(p) if err == nil { @@ -134,11 +134,11 @@ func TestHealthDomain(t *testing.T) { }) defer s.Close() - hc := NewHealthChecker(transport.DNS, true, hcDomain) + hc := NewHealthChecker("TestHealthDomain", transport.DNS, true, hcDomain) hc.SetReadTimeout(10 * time.Millisecond) hc.SetWriteTimeout(10 * time.Millisecond) - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestHealthDomain", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond err := hc.Check(p) if err != nil { diff --git a/plugin/pkg/proxy/metrics.go b/plugin/pkg/proxy/metrics.go index 148bc6edd..e4cae97c3 100644 --- a/plugin/pkg/proxy/metrics.go +++ b/plugin/pkg/proxy/metrics.go @@ -9,41 +9,32 @@ import ( // Variables declared for monitoring. var ( - RequestCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "proxy", - Name: "requests_total", - Help: "Counter of requests made per upstream.", - }, []string{"to"}) - RcodeCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "proxy", - Name: "responses_total", - Help: "Counter of responses received per upstream.", - }, []string{"rcode", "to"}) - RequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + requestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: plugin.Namespace, Subsystem: "proxy", Name: "request_duration_seconds", Buckets: plugin.TimeBuckets, Help: "Histogram of the time each request took.", - }, []string{"to", "rcode"}) - HealthcheckFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + }, []string{"proxy_name", "to", "rcode"}) + + healthcheckFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "proxy", Name: "healthcheck_failures_total", Help: "Counter of the number of failed healthchecks.", - }, []string{"to"}) - ConnCacheHitsCount = promauto.NewCounterVec(prometheus.CounterOpts{ + }, []string{"proxy_name", "to"}) + + connCacheHitsCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "proxy", Name: "conn_cache_hits_total", Help: "Counter of connection cache hits per upstream and protocol.", - }, []string{"to", "proto"}) - ConnCacheMissesCount = promauto.NewCounterVec(prometheus.CounterOpts{ + }, []string{"proxy_name", "to", "proto"}) + + connCacheMissesCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "proxy", Name: "conn_cache_misses_total", Help: "Counter of connection cache misses per upstream and protocol.", - }, []string{"to", "proto"}) + }, []string{"proxy_name", "to", "proto"}) ) diff --git a/plugin/pkg/proxy/persistent.go b/plugin/pkg/proxy/persistent.go index 2dc8bde71..49c9dd385 100644 --- a/plugin/pkg/proxy/persistent.go +++ b/plugin/pkg/proxy/persistent.go @@ -21,6 +21,7 @@ type Transport struct { expire time.Duration // After this duration a connection is expired. addr string tlsConfig *tls.Config + proxyName string dial chan string yield chan *persistConn @@ -28,7 +29,7 @@ type Transport struct { stop chan bool } -func newTransport(addr string) *Transport { +func newTransport(proxyName, addr string) *Transport { t := &Transport{ avgDialTime: int64(maxDialTimeout / 2), conns: [typeTotalCount][]*persistConn{}, @@ -38,6 +39,7 @@ func newTransport(addr string) *Transport { yield: make(chan *persistConn), ret: make(chan *persistConn), stop: make(chan bool), + proxyName: proxyName, } return t } diff --git a/plugin/pkg/proxy/persistent_test.go b/plugin/pkg/proxy/persistent_test.go index c78bd7f1f..56d837113 100644 --- a/plugin/pkg/proxy/persistent_test.go +++ b/plugin/pkg/proxy/persistent_test.go @@ -17,7 +17,7 @@ func TestCached(t *testing.T) { }) defer s.Close() - tr := newTransport(s.Addr) + tr := newTransport("TestCached", s.Addr) tr.Start() defer tr.Stop() @@ -56,7 +56,7 @@ func TestCleanupByTimer(t *testing.T) { }) defer s.Close() - tr := newTransport(s.Addr) + tr := newTransport("TestCleanupByTimer", s.Addr) tr.SetExpire(100 * time.Millisecond) tr.Start() defer tr.Stop() @@ -90,7 +90,7 @@ func TestCleanupAll(t *testing.T) { }) defer s.Close() - tr := newTransport(s.Addr) + tr := newTransport("TestCleanupAll", s.Addr) c1, _ := dns.DialTimeout("udp", tr.addr, maxDialTimeout) c2, _ := dns.DialTimeout("udp", tr.addr, maxDialTimeout) diff --git a/plugin/pkg/proxy/proxy.go b/plugin/pkg/proxy/proxy.go index 414c34240..99fb5df78 100644 --- a/plugin/pkg/proxy/proxy.go +++ b/plugin/pkg/proxy/proxy.go @@ -12,8 +12,9 @@ import ( // Proxy defines an upstream host. type Proxy struct { - fails uint32 - addr string + fails uint32 + addr string + proxyName string transport *Transport @@ -25,15 +26,17 @@ type Proxy struct { } // NewProxy returns a new proxy. -func NewProxy(addr, trans string) *Proxy { +func NewProxy(proxyName, addr, trans string) *Proxy { p := &Proxy{ addr: addr, fails: 0, probe: up.New(), readTimeout: 2 * time.Second, - transport: newTransport(addr), + transport: newTransport(proxyName, addr), + health: NewHealthChecker(proxyName, trans, true, "."), + proxyName: proxyName, } - p.health = NewHealthChecker(trans, true, ".") + runtime.SetFinalizer(p, (*Proxy).finalizer) return p } diff --git a/plugin/pkg/proxy/proxy_test.go b/plugin/pkg/proxy/proxy_test.go index 17125ea68..33a7170c0 100644 --- a/plugin/pkg/proxy/proxy_test.go +++ b/plugin/pkg/proxy/proxy_test.go @@ -24,7 +24,7 @@ func TestProxy(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) + p := NewProxy("TestProxy", s.Addr, transport.DNS) p.readTimeout = 10 * time.Millisecond p.Start(5 * time.Second) m := new(dns.Msg) @@ -54,7 +54,7 @@ func TestProxyTLSFail(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.TLS) + p := NewProxy("TestProxyTLSFail", s.Addr, transport.TLS) p.readTimeout = 10 * time.Millisecond p.SetTLSConfig(&tls.Config{}) p.Start(5 * time.Second) @@ -72,7 +72,7 @@ func TestProxyTLSFail(t *testing.T) { } func TestProtocolSelection(t *testing.T) { - p := NewProxy("bad_address", transport.DNS) + p := NewProxy("TestProtocolSelection", "bad_address", transport.DNS) p.readTimeout = 10 * time.Millisecond stateUDP := request.Request{W: &test.ResponseWriter{}, Req: new(dns.Msg)} @@ -119,7 +119,7 @@ func TestProxyIncrementFails(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - p := NewProxy("bad_address", transport.DNS) + p := NewProxy("TestProxyIncrementFails", "bad_address", transport.DNS) p.fails = tc.fails p.incrementFails() if p.fails != tc.expectFails {