package notifications import ( "net/http" "time" "github.com/distribution/distribution/v3/configuration" events "github.com/docker/go-events" ) // EndpointConfig covers the optional configuration parameters for an active // endpoint. type EndpointConfig struct { Headers http.Header Timeout time.Duration Threshold int Backoff time.Duration IgnoredMediaTypes []string Transport *http.Transport `json:"-"` Ignore configuration.Ignore } // defaults set any zero-valued fields to a reasonable default. func (ec *EndpointConfig) defaults() { if ec.Timeout <= 0 { ec.Timeout = time.Second } if ec.Threshold <= 0 { ec.Threshold = 10 } if ec.Backoff <= 0 { ec.Backoff = time.Second } if ec.Transport == nil { ec.Transport = http.DefaultTransport.(*http.Transport) } } // Endpoint is a reliable, queued, thread-safe sink that notify external http // services when events are written. Writes are non-blocking and always // succeed for callers but events may be queued internally. type Endpoint struct { events.Sink url string name string EndpointConfig metrics *safeMetrics } // NewEndpoint returns a running endpoint, ready to receive events. func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { var endpoint Endpoint endpoint.name = name endpoint.url = url endpoint.EndpointConfig = config endpoint.defaults() endpoint.metrics = newSafeMetrics(name) // Configures the inmemory queue, retry, http pipeline. endpoint.Sink = newHTTPSink( endpoint.url, endpoint.Timeout, endpoint.Headers, endpoint.Transport, endpoint.metrics.httpStatusListener()) endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff)) endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...) endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions) register(&endpoint) return &endpoint } // Name returns the name of the endpoint, generally used for debugging. func (e *Endpoint) Name() string { return e.name } // URL returns the url of the endpoint. func (e *Endpoint) URL() string { return e.url } // ReadMetrics populates em with metrics from the endpoint. func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { e.metrics.Lock() defer e.metrics.Unlock() *em = e.metrics.EndpointMetrics // Map still need to copied in a threadsafe manner. em.Statuses = make(map[string]int) for k, v := range e.metrics.Statuses { em.Statuses[k] = v } }