Merge pull request #2522 from tifayuki/notification_metrics

Add notification metrics
This commit is contained in:
Ryan Abrams 2019-06-28 11:10:51 -07:00 committed by GitHub
commit be07be9904
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 37 additions and 4 deletions

View file

@ -10,4 +10,7 @@ const (
var ( var (
// StorageNamespace is the prometheus namespace of blob/cache related operations // StorageNamespace is the prometheus namespace of blob/cache related operations
StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil) StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil)
// NotificationsNamespace is the prometheus namespace of notification related metrics
NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil)
) )

View file

@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
endpoint.url = url endpoint.url = url
endpoint.EndpointConfig = config endpoint.EndpointConfig = config
endpoint.defaults() endpoint.defaults()
endpoint.metrics = newSafeMetrics() endpoint.metrics = newSafeMetrics(name)
// Configures the inmemory queue, retry, http pipeline. // Configures the inmemory queue, retry, http pipeline.
endpoint.Sink = newHTTPSink( endpoint.Sink = newHTTPSink(

View file

@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) {
}) })
server := httptest.NewTLSServer(serverHandler) server := httptest.NewTLSServer(serverHandler)
metrics := newSafeMetrics() metrics := newSafeMetrics("")
sink := newHTTPSink(server.URL, 0, nil, nil, sink := newHTTPSink(server.URL, 0, nil, nil,
&endpointMetricsHTTPStatusListener{safeMetrics: metrics}) &endpointMetricsHTTPStatusListener{safeMetrics: metrics})

View file

@ -5,6 +5,18 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
prometheus "github.com/docker/distribution/metrics"
"github.com/docker/go-metrics"
)
var (
// eventsCounter counts total events of incoming, success, failure, and errors
eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "endpoint")
// pendingGauge measures the pending queue size
pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "endpoint")
// statusCounter counts the total notification call per each status code
statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "endpoint")
) )
// EndpointMetrics track various actions taken by the endpoint, typically by // EndpointMetrics track various actions taken by the endpoint, typically by
@ -22,14 +34,16 @@ type EndpointMetrics struct {
// safeMetrics guards the metrics implementation with a lock and provides a // safeMetrics guards the metrics implementation with a lock and provides a
// safe update function. // safe update function.
type safeMetrics struct { type safeMetrics struct {
EndpointName string
EndpointMetrics EndpointMetrics
sync.Mutex // protects statuses map sync.Mutex // protects statuses map
} }
// newSafeMetrics returns safeMetrics with map allocated. // newSafeMetrics returns safeMetrics with map allocated.
func newSafeMetrics() *safeMetrics { func newSafeMetrics(name string) *safeMetrics {
var sm safeMetrics var sm safeMetrics
sm.Statuses = make(map[string]int) sm.Statuses = make(map[string]int)
sm.EndpointName = name
return &sm return &sm
} }
@ -61,6 +75,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
defer emsl.safeMetrics.Unlock() defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Successes += len(events) emsl.Successes += len(events)
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(float64(len(events)))
} }
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
@ -68,12 +85,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
defer emsl.safeMetrics.Unlock() defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Failures += len(events) emsl.Failures += len(events)
statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(float64(len(events)))
} }
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
emsl.safeMetrics.Lock() emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock() defer emsl.safeMetrics.Unlock()
emsl.Errors += len(events) emsl.Errors += len(events)
eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(float64(len(events)))
} }
// endpointMetricsEventQueueListener maintains the incoming events counter and // endpointMetricsEventQueueListener maintains the incoming events counter and
@ -87,12 +109,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
defer eqc.Unlock() defer eqc.Unlock()
eqc.Events += len(events) eqc.Events += len(events)
eqc.Pending += len(events) eqc.Pending += len(events)
eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
pendingGauge.WithValues(eqc.EndpointName).Inc(float64(len(events)))
} }
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
eqc.Lock() eqc.Lock()
defer eqc.Unlock() defer eqc.Unlock()
eqc.Pending -= len(events) eqc.Pending -= len(events)
pendingGauge.WithValues(eqc.EndpointName).Dec(1)
} }
// endpoints is global registry of endpoints used to report metrics to expvar // endpoints is global registry of endpoints used to report metrics to expvar
@ -149,4 +176,7 @@ func init() {
})) }))
registry.(*expvar.Map).Set("notifications", &notifications) registry.(*expvar.Map).Set("notifications", &notifications)
// register prometheus metrics
metrics.Register(prometheus.NotificationsNamespace)
} }

View file

@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) {
func TestEventQueue(t *testing.T) { func TestEventQueue(t *testing.T) {
const nevents = 1000 const nevents = 1000
var ts testSink var ts testSink
metrics := newSafeMetrics() metrics := newSafeMetrics("")
eq := newEventQueue( eq := newEventQueue(
// delayed sync simulates destination slower than channel comms // delayed sync simulates destination slower than channel comms
&delayedSink{ &delayedSink{