distribution/storage/notifications/metrics.go
Stephen J Day 9f0c8d6616 Implement notification endpoint webhook dispatch
This changeset implements webhook notification endpoints for dispatching
registry events. Repository instances can be decorated by a listener that
converts calls into context-aware events, using a bridge. Events generated in
the bridge are written to a sink. Implementations of sink include a broadcast
and endpoint sink which can be used to configure event dispatch. Endpoints
represent a webhook notification target, with queueing and retries built in.
They can be added to a Broadcaster, which is a simple sink that writes a block
of events to several sinks, to provide a complete dispatch mechanism.

The main caveat to the current approach is that all unsent notifications are
inmemory. Best effort is made to ensure that notifications are not dropped, to
the point where queues may back up on faulty endpoints. If the endpoint is
fixed, the events will be retried and all messages will go through.

Internally, this functionality is all made up of Sink objects. The queuing
functionality is implemented with an eventQueue sink and retries are
implemented with retryingSink. Replacing the inmemory queuing with something
persistent should be as simple as replacing broadcaster with a remote queue and
that sets up the sinks to be local workers listening to that remote queue.

Metrics are kept for each endpoint and exported via expvar. This may not be a
permanent appraoch but should provide enough information for troubleshooting
notification problems.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2015-02-03 13:30:20 -08:00

152 lines
4 KiB
Go

package notifications
import (
"expvar"
"fmt"
"net/http"
"sync"
)
// EndpointMetrics track various actions taken by the endpoint, typically by
// number of events. The goal of this to export it via expvar but we may find
// some other future solution to be better.
type EndpointMetrics struct {
Pending int // events pending in queue
Events int // total events incoming
Successes int // total events written successfully
Failures int // total events failed
Errors int // total events errored
Statuses map[string]int // status code histogram, per call event
}
// safeMetrics guards the metrics implementation with a lock and provides a
// safe update function.
type safeMetrics struct {
EndpointMetrics
sync.Mutex // protects statuses map
}
// newSafeMetrics returns safeMetrics with map allocated.
func newSafeMetrics() *safeMetrics {
var sm safeMetrics
sm.Statuses = make(map[string]int)
return &sm
}
// httpStatusListener returns the listener for the http sink that updates the
// relevent counters.
func (sm *safeMetrics) httpStatusListener() httpStatusListener {
return &endpointMetricsHTTPStatusListener{
safeMetrics: sm,
}
}
// eventQueueListener returns a listener that maintains queue related counters.
func (sm *safeMetrics) eventQueueListener() eventQueueListener {
return &endpointMetricsEventQueueListener{
safeMetrics: sm,
}
}
// endpointMetricsHTTPStatusListener increments counters related to http sinks
// for the relevent events.
type endpointMetricsHTTPStatusListener struct {
*safeMetrics
}
var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Successes += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
emsl.Failures += len(events)
}
func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
emsl.safeMetrics.Lock()
defer emsl.safeMetrics.Unlock()
emsl.Errors += len(events)
}
// endpointMetricsEventQueueListener maintains the incoming events counter and
// the queues pending count.
type endpointMetricsEventQueueListener struct {
*safeMetrics
}
func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Events += len(events)
eqc.Pending += len(events)
}
func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
eqc.Lock()
defer eqc.Unlock()
eqc.Pending -= len(events)
}
// endpoints is global registry of endpoints used to report metrics to expvar
var endpoints struct {
registered []*Endpoint
mu sync.Mutex
}
// register places the endpoint into expvar so that stats are tracked.
func register(e *Endpoint) {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
endpoints.registered = append(endpoints.registered, e)
}
func init() {
// NOTE(stevvooe): Setup registry metrics structure to report to expvar.
// Ideally, we do more metrics through logging but we need some nice
// realtime metrics for queue state for now.
registry := expvar.Get("registry")
if registry == nil {
registry = expvar.NewMap("registry")
}
var notifications expvar.Map
notifications.Init()
notifications.Set("endpoints", expvar.Func(func() interface{} {
endpoints.mu.Lock()
defer endpoints.mu.Unlock()
var names []interface{}
for _, v := range endpoints.registered {
var epjson struct {
Name string `json:"name"`
URL string `json:"url"`
EndpointConfig
Metrics EndpointMetrics
}
epjson.Name = v.Name()
epjson.URL = v.URL()
epjson.EndpointConfig = v.EndpointConfig
v.ReadMetrics(&epjson.Metrics)
names = append(names, epjson)
}
return names
}))
registry.(*expvar.Map).Set("notifications", &notifications)
}