forked from TrueCloudLab/distribution
1d33874951
Go 1.13 and up enforce import paths to be versioned if a project contains a go.mod and has released v2 or up. The current v2.x branches (and releases) do not yet have a go.mod, and therefore are still allowed to be imported with a non-versioned import path (go modules add a `+incompatible` annotation in that case). However, now that this project has a `go.mod` file, incompatible import paths will not be accepted by go modules, and attempting to use code from this repository will fail. This patch uses `v3` for the import-paths (not `v2`), because changing import paths itself is a breaking change, which means that the next release should increment the "major" version to comply with SemVer (as go modules dictate). Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
98 lines
2.6 KiB
Go
98 lines
2.6 KiB
Go
package notifications
|
|
|
|
import (
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/distribution/distribution/v3/configuration"
|
|
events "github.com/docker/go-events"
|
|
)
|
|
|
|
// EndpointConfig covers the optional configuration parameters for an active
|
|
// endpoint.
|
|
type EndpointConfig struct {
|
|
Headers http.Header
|
|
Timeout time.Duration
|
|
Threshold int
|
|
Backoff time.Duration
|
|
IgnoredMediaTypes []string
|
|
Transport *http.Transport `json:"-"`
|
|
Ignore configuration.Ignore
|
|
}
|
|
|
|
// defaults set any zero-valued fields to a reasonable default.
|
|
func (ec *EndpointConfig) defaults() {
|
|
if ec.Timeout <= 0 {
|
|
ec.Timeout = time.Second
|
|
}
|
|
|
|
if ec.Threshold <= 0 {
|
|
ec.Threshold = 10
|
|
}
|
|
|
|
if ec.Backoff <= 0 {
|
|
ec.Backoff = time.Second
|
|
}
|
|
|
|
if ec.Transport == nil {
|
|
ec.Transport = http.DefaultTransport.(*http.Transport)
|
|
}
|
|
}
|
|
|
|
// Endpoint is a reliable, queued, thread-safe sink that notify external http
|
|
// services when events are written. Writes are non-blocking and always
|
|
// succeed for callers but events may be queued internally.
|
|
type Endpoint struct {
|
|
events.Sink
|
|
url string
|
|
name string
|
|
|
|
EndpointConfig
|
|
|
|
metrics *safeMetrics
|
|
}
|
|
|
|
// NewEndpoint returns a running endpoint, ready to receive events.
|
|
func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
|
var endpoint Endpoint
|
|
endpoint.name = name
|
|
endpoint.url = url
|
|
endpoint.EndpointConfig = config
|
|
endpoint.defaults()
|
|
endpoint.metrics = newSafeMetrics(name)
|
|
|
|
// Configures the inmemory queue, retry, http pipeline.
|
|
endpoint.Sink = newHTTPSink(
|
|
endpoint.url, endpoint.Timeout, endpoint.Headers,
|
|
endpoint.Transport, endpoint.metrics.httpStatusListener())
|
|
endpoint.Sink = events.NewRetryingSink(endpoint.Sink, events.NewBreaker(endpoint.Threshold, endpoint.Backoff))
|
|
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
|
mediaTypes := append(config.Ignore.MediaTypes, config.IgnoredMediaTypes...)
|
|
endpoint.Sink = newIgnoredSink(endpoint.Sink, mediaTypes, config.Ignore.Actions)
|
|
|
|
register(&endpoint)
|
|
return &endpoint
|
|
}
|
|
|
|
// Name returns the name of the endpoint, generally used for debugging.
|
|
func (e *Endpoint) Name() string {
|
|
return e.name
|
|
}
|
|
|
|
// URL returns the url of the endpoint.
|
|
func (e *Endpoint) URL() string {
|
|
return e.url
|
|
}
|
|
|
|
// ReadMetrics populates em with metrics from the endpoint.
|
|
func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
|
|
e.metrics.Lock()
|
|
defer e.metrics.Unlock()
|
|
|
|
*em = e.metrics.EndpointMetrics
|
|
// Map still need to copied in a threadsafe manner.
|
|
em.Statuses = make(map[string]int)
|
|
for k, v := range e.metrics.Statuses {
|
|
em.Statuses[k] = v
|
|
}
|
|
}
|