From ad6bb66faf2267b36030d09514d5b6cf52ccc824 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Mon, 12 Sep 2016 15:07:49 -0700 Subject: [PATCH] Add notification filtering by target media type The Hub registry generates a large volume of notifications, many of which are uninteresting based on target media type. Discarding them within the notification endpoint consumes considerable resources that could be saved by discarding them within the registry. To that end, this change adds registry configuration options to restrict the notifications sent to an endpoint based on target media type. Signed-off-by: Noah Treuhaft --- configuration/configuration.go | 15 ++++++------ configuration/configuration_test.go | 5 ++++ docs/configuration.md | 16 ++++++++++++ notifications/endpoint.go | 12 +++++---- notifications/sinks.go | 38 +++++++++++++++++++++++++++++ notifications/sinks_test.go | 33 +++++++++++++++++++++++++ registry/handlers/app.go | 9 ++++--- 7 files changed, 112 insertions(+), 16 deletions(-) diff --git a/configuration/configuration.go b/configuration/configuration.go index 95fad34e4..0605b8bc1 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -521,13 +521,14 @@ type Notifications struct { // Endpoint describes the configuration of an http webhook notification // endpoint. type Endpoint struct { - Name string `yaml:"name"` // identifies the endpoint in the registry instance. - Disabled bool `yaml:"disabled"` // disables the endpoint - URL string `yaml:"url"` // post url for the endpoint. - Headers http.Header `yaml:"headers"` // static headers that should be added to all requests - Timeout time.Duration `yaml:"timeout"` // HTTP timeout - Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure - Backoff time.Duration `yaml:"backoff"` // backoff duration + Name string `yaml:"name"` // identifies the endpoint in the registry instance. + Disabled bool `yaml:"disabled"` // disables the endpoint + URL string `yaml:"url"` // post url for the endpoint. + Headers http.Header `yaml:"headers"` // static headers that should be added to all requests + Timeout time.Duration `yaml:"timeout"` // HTTP timeout + Threshold int `yaml:"threshold"` // circuit breaker threshold before backing off on failure + Backoff time.Duration `yaml:"backoff"` // backoff duration + IgnoredMediaTypes []string `yaml:"ignoredmediatypes"` // target media types to ignore } // Reporting defines error reporting methods. diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 1768894e6..9285c0e72 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -59,6 +59,7 @@ var configStruct = Configuration{ Headers: http.Header{ "Authorization": []string{"Bearer "}, }, + IgnoredMediaTypes: []string{"application/octet-stream"}, }, }, }, @@ -136,6 +137,8 @@ notifications: url: http://example.com headers: Authorization: [Bearer ] + ignoredmediatypes: + - application/octet-stream reporting: bugsnag: apikey: BugsnagApiKey @@ -162,6 +165,8 @@ notifications: url: http://example.com headers: Authorization: [Bearer ] + ignoredmediatypes: + - application/octet-stream http: headers: X-Content-Type-Options: [nosniff] diff --git a/docs/configuration.md b/docs/configuration.md index 0689d8d23..4c5a2e6b2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -212,6 +212,8 @@ information about each option that appears later in this page. timeout: 500 threshold: 5 backoff: 1000 + ignoredmediatypes: + - application/octet-stream redis: addr: localhost:6379 password: asecret @@ -1162,6 +1164,8 @@ settings for the registry. timeout: 500 threshold: 5 backoff: 1000 + ignoredmediatypes: + - application/octet-stream The notifications option is **optional** and currently may contain a single option, `endpoints`. @@ -1276,6 +1280,18 @@ The URL to which events should be published. If you omit the suffix, the system interprets the value as nanoseconds. + + + ignoredmediatypes + + + no + + + List of target media types to ignore. An event whose target media type + is present in this list will not be published to the endpoint. + + diff --git a/notifications/endpoint.go b/notifications/endpoint.go index b5ed955d1..29a9e27b5 100644 --- a/notifications/endpoint.go +++ b/notifications/endpoint.go @@ -8,11 +8,12 @@ import ( // EndpointConfig covers the optional configuration parameters for an active // endpoint. type EndpointConfig struct { - Headers http.Header - Timeout time.Duration - Threshold int - Backoff time.Duration - Transport *http.Transport + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration + IgnoredMediaTypes []string + Transport *http.Transport } // defaults set any zero-valued fields to a reasonable default. @@ -62,6 +63,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { endpoint.Transport, endpoint.metrics.httpStatusListener()) endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + endpoint.Sink = newIgnoredMediaTypesSink(endpoint.Sink, config.IgnoredMediaTypes) register(&endpoint) return &endpoint diff --git a/notifications/sinks.go b/notifications/sinks.go index dda4a5653..549ba97e2 100644 --- a/notifications/sinks.go +++ b/notifications/sinks.go @@ -210,6 +210,44 @@ func (eq *eventQueue) next() []Event { return block } +// ignoredMediaTypesSink discards events with ignored target media types and +// passes the rest along. +type ignoredMediaTypesSink struct { + Sink + ignored map[string]bool +} + +func newIgnoredMediaTypesSink(sink Sink, ignored []string) Sink { + if len(ignored) == 0 { + return sink + } + + ignoredMap := make(map[string]bool) + for _, mediaType := range ignored { + ignoredMap[mediaType] = true + } + + return &ignoredMediaTypesSink{ + Sink: sink, + ignored: ignoredMap, + } +} + +// Write discards events with ignored target media types and passes the rest +// along. +func (imts *ignoredMediaTypesSink) Write(events ...Event) error { + var kept []Event + for _, e := range events { + if !imts.ignored[e.Target.MediaType] { + kept = append(kept, e) + } + } + if len(kept) == 0 { + return nil + } + return imts.Sink.Write(kept...) +} + // retryingSink retries the write until success or an ErrSinkClosed is // returned. Underlying sink must have p > 0 of succeeding or the sink will // block. Internally, it is a circuit breaker retries to manage reset. diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go index 89756a999..1bfa12c6b 100644 --- a/notifications/sinks_test.go +++ b/notifications/sinks_test.go @@ -3,6 +3,7 @@ package notifications import ( "fmt" "math/rand" + "reflect" "sync" "time" @@ -112,6 +113,38 @@ func TestEventQueue(t *testing.T) { } } +func TestIgnoredMediaTypesSink(t *testing.T) { + blob := createTestEvent("push", "library/test", "blob") + manifest := createTestEvent("push", "library/test", "manifest") + + type testcase struct { + ignored []string + expected []Event + } + + cases := []testcase{ + {nil, []Event{blob, manifest}}, + {[]string{"other"}, []Event{blob, manifest}}, + {[]string{"blob"}, []Event{manifest}}, + {[]string{"blob", "manifest"}, nil}, + } + + for _, c := range cases { + ts := &testSink{} + s := newIgnoredMediaTypesSink(ts, c.ignored) + + if err := s.Write(blob, manifest); err != nil { + t.Fatalf("error writing event: %v", err) + } + + ts.mu.Lock() + if !reflect.DeepEqual(ts.events, c.expected) { + t.Fatalf("unexpected events: %#v != %#v", ts.events, c.expected) + } + ts.mu.Unlock() + } +} + func TestRetryingSink(t *testing.T) { // Make a sync that fails most of the time, ensuring that all the events diff --git a/registry/handlers/app.go b/registry/handlers/app.go index cdd88bf12..4df15ae6e 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -427,10 +427,11 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { ctxu.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers) endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{ - Timeout: endpoint.Timeout, - Threshold: endpoint.Threshold, - Backoff: endpoint.Backoff, - Headers: endpoint.Headers, + Timeout: endpoint.Timeout, + Threshold: endpoint.Threshold, + Backoff: endpoint.Backoff, + Headers: endpoint.Headers, + IgnoredMediaTypes: endpoint.IgnoredMediaTypes, }) sinks = append(sinks, endpoint)