diff --git a/notifications/endpoint.go b/notifications/endpoint.go index a8a52d0c..854f1dd6 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.url = url endpoint.EndpointConfig = config endpoint.defaults() - endpoint.metrics = newSafeMetrics() + endpoint.metrics = newSafeMetrics(name) // Configures the inmemory queue, retry, http pipeline. endpoint.Sink = newHTTPSink( diff --git a/notifications/http_test.go b/notifications/http_test.go index de47f789..b7845cf9 100644 --- a/notifications/http_test.go +++ b/notifications/http_test.go @@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) { }) server := httptest.NewTLSServer(serverHandler) - metrics := newSafeMetrics() + metrics := newSafeMetrics("") sink := newHTTPSink(server.URL, 0, nil, nil, &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) diff --git a/notifications/metrics.go b/notifications/metrics.go index 69960e9c..4464edd8 100644 --- a/notifications/metrics.go +++ b/notifications/metrics.go @@ -12,11 +12,11 @@ import ( var ( // eventsCounter counts total events of incoming, success, failure, and errors - eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type") + eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to") // pendingGauge measures the pending queue size - pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total) + pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to") // statusCounter counts the total notification call per each status code - statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code") + statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to") ) // EndpointMetrics track various actions taken by the endpoint, typically by @@ -34,14 +34,16 @@ type EndpointMetrics struct { // safeMetrics guards the metrics implementation with a lock and provides a // safe update function. type safeMetrics struct { + EndpointName string EndpointMetrics sync.Mutex // protects statuses map } // newSafeMetrics returns safeMetrics with map allocated. -func newSafeMetrics() *safeMetrics { +func newSafeMetrics(name string) *safeMetrics { var sm safeMetrics sm.Statuses = make(map[string]int) + sm.EndpointName = name return &sm } @@ -74,8 +76,8 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Successes += len(events) - statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) - eventsCounter.WithValues("Successes").Inc(1) + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { @@ -84,8 +86,8 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) emsl.Failures += len(events) - statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1) - eventsCounter.WithValues("Failures").Inc(1) + statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1) + eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1) } func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { @@ -93,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { defer emsl.safeMetrics.Unlock() emsl.Errors += len(events) - eventsCounter.WithValues("Errors").Inc(1) + eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1) } // endpointMetricsEventQueueListener maintains the incoming events counter and @@ -108,8 +110,8 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { eqc.Events += len(events) eqc.Pending += len(events) - eventsCounter.WithValues("Events").Inc() - pendingGauge.Inc(1) + eventsCounter.WithValues("Events", eqc.EndpointName).Inc() + pendingGauge.WithValues(eqc.EndpointName).Inc(1) } func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { @@ -117,7 +119,7 @@ func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { defer eqc.Unlock() eqc.Pending -= len(events) - pendingGauge.Dec(1) + pendingGauge.WithValues(eqc.EndpointName).Dec(1) } // endpoints is global registry of endpoints used to report metrics to expvar diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 06f88b2c..4a69486b 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) { func TestEventQueue(t *testing.T) { const nevents = 1000 var ts testSink - metrics := newSafeMetrics() + metrics := newSafeMetrics("") eq := newEventQueue( // delayed sync simulates destination slower than channel comms &delayedSink{