forked from TrueCloudLab/distribution
Add notification filtering by target media type
The Hub registry generates a large volume of notifications, many of which are uninteresting based on target media type. Discarding them within the notification endpoint consumes considerable resources that could be saved by discarding them within the registry. To that end, this change adds registry configuration options to restrict the notifications sent to an endpoint based on target media type. Signed-off-by: Noah Treuhaft <noah.treuhaft@docker.com>
This commit is contained in:
parent
cb744efe8b
commit
ad6bb66faf
7 changed files with 112 additions and 16 deletions
|
@ -521,13 +521,14 @@ type Notifications struct {
|
||||||
// Endpoint describes the configuration of an http webhook notification
|
// Endpoint describes the configuration of an http webhook notification
|
||||||
// endpoint.
|
// endpoint.
|
||||||
type Endpoint struct {
|
type Endpoint struct {
|
||||||
Name string `yaml:"name"` // identifies the endpoint in the registry instance.
|
Name string `yaml:"name"` // identifies the endpoint in the registry instance.
|
||||||
Disabled bool `yaml:"disabled"` // disables the endpoint
|
Disabled bool `yaml:"disabled"` // disables the endpoint
|
||||||
URL string `yaml:"url"` // post url for the endpoint.
|
URL string `yaml:"url"` // post url for the endpoint.
|
||||||
Headers http.Header `yaml:"headers"` // static headers that should be added to all requests
|
Headers http.Header `yaml:"headers"` // static headers that should be added to all requests
|
||||||
Timeout time.Duration `yaml:"timeout"` // HTTP timeout
|
Timeout time.Duration `yaml:"timeout"` // HTTP timeout
|
||||||
Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure
|
Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure
|
||||||
Backoff time.Duration `yaml:"backoff"` // backoff duration
|
Backoff time.Duration `yaml:"backoff"` // backoff duration
|
||||||
|
IgnoredMediaTypes []string `yaml:"ignoredmediatypes"` // target media types to ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reporting defines error reporting methods.
|
// Reporting defines error reporting methods.
|
||||||
|
|
|
@ -59,6 +59,7 @@ var configStruct = Configuration{
|
||||||
Headers: http.Header{
|
Headers: http.Header{
|
||||||
"Authorization": []string{"Bearer <example>"},
|
"Authorization": []string{"Bearer <example>"},
|
||||||
},
|
},
|
||||||
|
IgnoredMediaTypes: []string{"application/octet-stream"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -136,6 +137,8 @@ notifications:
|
||||||
url: http://example.com
|
url: http://example.com
|
||||||
headers:
|
headers:
|
||||||
Authorization: [Bearer <example>]
|
Authorization: [Bearer <example>]
|
||||||
|
ignoredmediatypes:
|
||||||
|
- application/octet-stream
|
||||||
reporting:
|
reporting:
|
||||||
bugsnag:
|
bugsnag:
|
||||||
apikey: BugsnagApiKey
|
apikey: BugsnagApiKey
|
||||||
|
@ -162,6 +165,8 @@ notifications:
|
||||||
url: http://example.com
|
url: http://example.com
|
||||||
headers:
|
headers:
|
||||||
Authorization: [Bearer <example>]
|
Authorization: [Bearer <example>]
|
||||||
|
ignoredmediatypes:
|
||||||
|
- application/octet-stream
|
||||||
http:
|
http:
|
||||||
headers:
|
headers:
|
||||||
X-Content-Type-Options: [nosniff]
|
X-Content-Type-Options: [nosniff]
|
||||||
|
|
|
@ -212,6 +212,8 @@ information about each option that appears later in this page.
|
||||||
timeout: 500
|
timeout: 500
|
||||||
threshold: 5
|
threshold: 5
|
||||||
backoff: 1000
|
backoff: 1000
|
||||||
|
ignoredmediatypes:
|
||||||
|
- application/octet-stream
|
||||||
redis:
|
redis:
|
||||||
addr: localhost:6379
|
addr: localhost:6379
|
||||||
password: asecret
|
password: asecret
|
||||||
|
@ -1162,6 +1164,8 @@ settings for the registry.
|
||||||
timeout: 500
|
timeout: 500
|
||||||
threshold: 5
|
threshold: 5
|
||||||
backoff: 1000
|
backoff: 1000
|
||||||
|
ignoredmediatypes:
|
||||||
|
- application/octet-stream
|
||||||
|
|
||||||
The notifications option is **optional** and currently may contain a single
|
The notifications option is **optional** and currently may contain a single
|
||||||
option, `endpoints`.
|
option, `endpoints`.
|
||||||
|
@ -1276,6 +1280,18 @@ The URL to which events should be published.
|
||||||
If you omit the suffix, the system interprets the value as nanoseconds.
|
If you omit the suffix, the system interprets the value as nanoseconds.
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>
|
||||||
|
<code>ignoredmediatypes</code>
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
no
|
||||||
|
</td>
|
||||||
|
<td>
|
||||||
|
List of target media types to ignore. An event whose target media type
|
||||||
|
is present in this list will not be published to the endpoint.
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
</table>
|
</table>
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -8,11 +8,12 @@ import (
|
||||||
// EndpointConfig covers the optional configuration parameters for an active
|
// EndpointConfig covers the optional configuration parameters for an active
|
||||||
// endpoint.
|
// endpoint.
|
||||||
type EndpointConfig struct {
|
type EndpointConfig struct {
|
||||||
Headers http.Header
|
Headers http.Header
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
Threshold int
|
Threshold int
|
||||||
Backoff time.Duration
|
Backoff time.Duration
|
||||||
Transport *http.Transport
|
IgnoredMediaTypes []string
|
||||||
|
Transport *http.Transport
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaults set any zero-valued fields to a reasonable default.
|
// defaults set any zero-valued fields to a reasonable default.
|
||||||
|
@ -62,6 +63,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
|
||||||
endpoint.Transport, endpoint.metrics.httpStatusListener())
|
endpoint.Transport, endpoint.metrics.httpStatusListener())
|
||||||
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
|
endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
|
||||||
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
|
||||||
|
endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes)
|
||||||
|
|
||||||
register(&endpoint)
|
register(&endpoint)
|
||||||
return &endpoint
|
return &endpoint
|
||||||
|
|
|
@ -210,6 +210,44 @@ func (eq *eventQueue) next() []Event {
|
||||||
return block
|
return block
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ignoredMediaTypesSink discards events with ignored target media types and
|
||||||
|
// passes the rest along.
|
||||||
|
type ignoredMediaTypesSink struct {
|
||||||
|
Sink
|
||||||
|
ignored map[string]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink {
|
||||||
|
if len(ignored) == 0 {
|
||||||
|
return sink
|
||||||
|
}
|
||||||
|
|
||||||
|
ignoredMap := make(map[string]bool)
|
||||||
|
for _, mediaType := range ignored {
|
||||||
|
ignoredMap[mediaType] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ignoredMediaTypesSink{
|
||||||
|
Sink: sink,
|
||||||
|
ignored: ignoredMap,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write discards events with ignored target media types and passes the rest
|
||||||
|
// along.
|
||||||
|
func (imts *ignoredMediaTypesSink) Write(events ...Event) error {
|
||||||
|
var kept []Event
|
||||||
|
for _, e := range events {
|
||||||
|
if !imts.ignored[e.Target.MediaType] {
|
||||||
|
kept = append(kept, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(kept) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return imts.Sink.Write(kept...)
|
||||||
|
}
|
||||||
|
|
||||||
// retryingSink retries the write until success or an ErrSinkClosed is
|
// retryingSink retries the write until success or an ErrSinkClosed is
|
||||||
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
// returned. Underlying sink must have p > 0 of succeeding or the sink will
|
||||||
// block. Internally, it is a circuit breaker retries to manage reset.
|
// block. Internally, it is a circuit breaker retries to manage reset.
|
||||||
|
|
|
@ -3,6 +3,7 @@ package notifications
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -112,6 +113,38 @@ func TestEventQueue(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIgnoredMediaTypesSink(t *testing.T) {
|
||||||
|
blob := createTestEvent("push", "library/test", "blob")
|
||||||
|
manifest := createTestEvent("push", "library/test", "manifest")
|
||||||
|
|
||||||
|
type testcase struct {
|
||||||
|
ignored []string
|
||||||
|
expected []Event
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := []testcase{
|
||||||
|
{nil, []Event{blob, manifest}},
|
||||||
|
{[]string{"other"}, []Event{blob, manifest}},
|
||||||
|
{[]string{"blob"}, []Event{manifest}},
|
||||||
|
{[]string{"blob", "manifest"}, nil},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
ts := &testSink{}
|
||||||
|
s := newIgnoredMediaTypesSink(ts, c.ignored)
|
||||||
|
|
||||||
|
if err := s.Write(blob, manifest); err != nil {
|
||||||
|
t.Fatalf("error writing event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.mu.Lock()
|
||||||
|
if !reflect.DeepEqual(ts.events, c.expected) {
|
||||||
|
t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected)
|
||||||
|
}
|
||||||
|
ts.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRetryingSink(t *testing.T) {
|
func TestRetryingSink(t *testing.T) {
|
||||||
|
|
||||||
// Make a sync that fails most of the time, ensuring that all the events
|
// Make a sync that fails most of the time, ensuring that all the events
|
||||||
|
|
|
@ -427,10 +427,11 @@ func (app *App) configureEvents(configuration *configuration.Configuration) {
|
||||||
|
|
||||||
ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
|
ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
|
||||||
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
|
endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
|
||||||
Timeout: endpoint.Timeout,
|
Timeout: endpoint.Timeout,
|
||||||
Threshold: endpoint.Threshold,
|
Threshold: endpoint.Threshold,
|
||||||
Backoff: endpoint.Backoff,
|
Backoff: endpoint.Backoff,
|
||||||
Headers: endpoint.Headers,
|
Headers: endpoint.Headers,
|
||||||
|
IgnoredMediaTypes: endpoint.IgnoredMediaTypes,
|
||||||
})
|
})
|
||||||
|
|
||||||
sinks = append(sinks, endpoint)
|
sinks = append(sinks, endpoint)
|
||||||
|
|
Loading…
Reference in a new issue