From 9f0c8d6616a99a13bdb106c743835ea314227151 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 27 Jan 2015 23:27:46 -0800 Subject: [PATCH] Implement notification endpoint webhook dispatch This changeset implements webhook notification endpoints for dispatching registry events. Repository instances can be decorated by a listener that converts calls into context-aware events, using a bridge. Events generated in the bridge are written to a sink. Implementations of sink include a broadcast and endpoint sink which can be used to configure event dispatch. Endpoints represent a webhook notification target, with queueing and retries built in. They can be added to a Broadcaster, which is a simple sink that writes a block of events to several sinks, to provide a complete dispatch mechanism. The main caveat to the current approach is that all unsent notifications are inmemory. Best effort is made to ensure that notifications are not dropped, to the point where queues may back up on faulty endpoints. If the endpoint is fixed, the events will be retried and all messages will go through. Internally, this functionality is all made up of Sink objects. The queuing functionality is implemented with an eventQueue sink and retries are implemented with retryingSink. Replacing the inmemory queuing with something persistent should be as simple as replacing broadcaster with a remote queue and that sets up the sinks to be local workers listening to that remote queue. Metrics are kept for each endpoint and exported via expvar. This may not be a permanent appraoch but should provide enough information for troubleshooting notification problems. Signed-off-by: Stephen J Day --- storage/notifications/bridge.go | 139 ++++++++++ storage/notifications/endpoint.go | 86 +++++++ storage/notifications/event.go | 29 ++- storage/notifications/event_test.go | 38 +-- storage/notifications/http.go | 145 +++++++++++ storage/notifications/http_test.go | 155 ++++++++++++ storage/notifications/listener.go | 140 ++++++++++ storage/notifications/listener_test.go | 151 +++++++++++ storage/notifications/metrics.go | 152 +++++++++++ storage/notifications/sinks.go | 337 +++++++++++++++++++++++++ storage/notifications/sinks_test.go | 223 ++++++++++++++++ 11 files changed, 1569 insertions(+), 26 deletions(-) create mode 100644 storage/notifications/bridge.go create mode 100644 storage/notifications/endpoint.go create mode 100644 storage/notifications/http.go create mode 100644 storage/notifications/http_test.go create mode 100644 storage/notifications/listener.go create mode 100644 storage/notifications/listener_test.go create mode 100644 storage/notifications/metrics.go create mode 100644 storage/notifications/sinks.go create mode 100644 storage/notifications/sinks_test.go diff --git a/storage/notifications/bridge.go b/storage/notifications/bridge.go new file mode 100644 index 000000000..2ff0dff6e --- /dev/null +++ b/storage/notifications/bridge.go @@ -0,0 +1,139 @@ +package notifications + +import ( + "time" + + "github.com/docker/distribution/manifest" + + "code.google.com/p/go-uuid/uuid" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/storage" +) + +type bridge struct { + ub URLBuilder + actor ActorRecord + source SourceRecord + sink Sink +} + +var _ Listener = &bridge{} + +// URLBuilder defines a subset of url builder to be used by the event listener. +type URLBuilder interface { + BuildManifestURL(name, tag string) (string, error) + BuildBlobURL(name string, dgst digest.Digest) (string, error) +} + +// NewBridge returns a notification listener that writes records to sink, +// using the actor and source. Any urls populated in the events created by +// this bridge will be created using the URLBuilder. +func NewBridge(ub URLBuilder, source SourceRecord, actor ActorRecord, sink Sink) Listener { + return &bridge{ + ub: ub, + actor: actor, + source: source, + sink: sink, + } +} + +func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPush, repo, sm) +} + +func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionPull, repo, sm) +} + +func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +} + +func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) +} + +func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) +} + +func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) +} + +func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { + event, err := b.createManifestEvent(action, repo, sm) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "manifest" + event.Target.Name = repo.Name() + event.Target.Tag = sm.Tag + + p, err := sm.Payload() + if err != nil { + return nil, err + } + + event.Target.Digest, err = digest.FromBytes(p) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is + // implemented, this should be replaced. + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + if err != nil { + return nil, err + } + + return event, nil +} + +func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { + event, err := b.createLayerEvent(action, repo, dgst) + if err != nil { + return err + } + + return b.sink.Write(*event) +} + +func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { + event := b.createEvent(action) + event.Target.Type = "layer" + event.Target.Name = repo.Name() + event.Target.Digest = dgst + + var err error + event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), dgst) + if err != nil { + return nil, err + } + + return event, nil +} + +// createEvent creates an event with actor and source populated. +func (b *bridge) createEvent(action string) *Event { + event := createEvent(action) + event.Source = b.source + event.Actor = b.actor + + return event +} + +// createEvent returns a new event, timestamped, with the specified action. +func createEvent(action string) *Event { + return &Event{ + ID: uuid.New(), + Timestamp: time.Now(), + Action: action, + } +} diff --git a/storage/notifications/endpoint.go b/storage/notifications/endpoint.go new file mode 100644 index 000000000..dfdb111c5 --- /dev/null +++ b/storage/notifications/endpoint.go @@ -0,0 +1,86 @@ +package notifications + +import ( + "net/http" + "time" +) + +// EndpointConfig covers the optional configuration parameters for an active +// endpoint. +type EndpointConfig struct { + Headers http.Header + Timeout time.Duration + Threshold int + Backoff time.Duration +} + +// defaults set any zero-valued fields to a reasonable default. +func (ec *EndpointConfig) defaults() { + if ec.Timeout <= 0 { + ec.Timeout = time.Second + } + + if ec.Threshold <= 0 { + ec.Threshold = 10 + } + + if ec.Backoff <= 0 { + ec.Backoff = time.Second + } +} + +// Endpoint is a reliable, queued, thread-safe sink that notify external http +// services when events are written. Writes are non-blocking and always +// succeed for callers but events may be queued internally. +type Endpoint struct { + Sink + url string + name string + + EndpointConfig + + metrics *safeMetrics +} + +// NewEndpoint returns a running endpoint, ready to receive events. +func NewEndpoint(name, url string, config EndpointConfig) *Endpoint { + var endpoint Endpoint + endpoint.name = name + endpoint.url = url + endpoint.EndpointConfig = config + endpoint.defaults() + endpoint.metrics = newSafeMetrics() + + // Configures the inmemory queue, retry, http pipeline. + endpoint.Sink = newHTTPSink( + endpoint.url, endpoint.Timeout, endpoint.Headers, + endpoint.metrics.httpStatusListener()) + endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff) + endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener()) + + register(&endpoint) + return &endpoint +} + +// Name returns the name of the endpoint, generally used for debugging. +func (e *Endpoint) Name() string { + return e.name +} + +// URL returns the url of the endpoint. +func (e *Endpoint) URL() string { + return e.url +} + +// ReadMetrics populates em with metrics from the endpoint. +func (e *Endpoint) ReadMetrics(em *EndpointMetrics) { + e.metrics.Lock() + defer e.metrics.Unlock() + + *em = e.metrics.EndpointMetrics + // Map still need to copied in a threadsafe manner. + em.Statuses = make(map[string]int) + for k, v := range e.metrics.Statuses { + em.Statuses[k] = v + } +} diff --git a/storage/notifications/event.go b/storage/notifications/event.go index 3c000dc20..f03920cac 100644 --- a/storage/notifications/event.go +++ b/storage/notifications/event.go @@ -36,7 +36,7 @@ type Envelope struct { // Event provides the fields required to describe a registry event. type Event struct { // ID provides a unique identifier for the event. - ID string `json:"uuid,omitempty"` + ID string `json:"id,omitempty"` // Timestamp is the time at which the event occurred. Timestamp time.Time `json:"timestamp,omitempty"` @@ -74,6 +74,8 @@ type Event struct { // ActorRecord specifies the agent that initiated the event. For most // situations, this could be from the authorizaton context of the request. +// Data in this record can refer to both the initiating client and the +// generating request. type ActorRecord struct { // Name corresponds to the subject or username associated with the // request context that generated the event. @@ -82,6 +84,22 @@ type ActorRecord struct { // Addr contains the ip or hostname and possibly port of the client // connection that initiated the event. Addr string `json:"addr,omitempty"` + + // Host is the externally accessible host name of the registry instance, + // as specified by the http host header on incoming requests. + Host string `json:"host,omitempty"` + + // RequestID uniquely identifies the registry request that generated the + // event. + RequestID string `json:"requestID,omitempty"` + + // TODO(stevvooe): Look into setting a session cookie to get this + // without docker daemon. + // SessionID + + // TODO(stevvooe): Push the "Docker-Command" header to replace cookie and + // get the actual command. + // Command } // SourceRecord identifies the registry node that generated the event. Put @@ -93,12 +111,9 @@ type SourceRecord struct { // os.Hostname() along with the running port. Addr string `json:"addr,omitempty"` - // Host is the dns name of the registry cluster, as configured. - Host string `json:"host,omitempty"` - - // RequestID uniquely identifies the registry request that generated the - // event. - RequestID string `json:"request_id,omitempty"` + // InstanceID identifies a running instance of an application. Changes + // after each restart. + InstanceID string `json:"instanceID,omitempty"` } var ( diff --git a/storage/notifications/event_test.go b/storage/notifications/event_test.go index 37e43c66b..77bd19f96 100644 --- a/storage/notifications/event_test.go +++ b/storage/notifications/event_test.go @@ -15,7 +15,7 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { { "events": [ { - "uuid": "asdf-asdf-asdf-asdf-0", + "id": "asdf-asdf-asdf-asdf-0", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -27,16 +27,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-1", + "id": "asdf-asdf-asdf-asdf-1", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -47,16 +47,16 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } }, { - "uuid": "asdf-asdf-asdf-asdf-2", + "id": "asdf-asdf-asdf-asdf-2", "timestamp": "2006-01-02T15:04:05Z", "action": "push", "target": { @@ -67,12 +67,12 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { }, "actor": { "name": "test-actor", - "addr": "hostname.local" + "addr": "client.local", + "host": "registrycluster.local", + "requestID": "asdfasdf" }, "source": { - "addr": "hostname.local", - "host": "registrycluster.local", - "request_id": "asdfasdf" + "addr": "hostname.local:port" } } ] @@ -87,11 +87,11 @@ func TestEventEnvelopeJSONFormat(t *testing.T) { var prototype Event prototype.Action = "push" prototype.Timestamp = tm - prototype.Actor.Addr = "hostname.local" + prototype.Actor.Addr = "client.local" prototype.Actor.Name = "test-actor" - prototype.Source.Addr = "hostname.local" - prototype.Source.Host = "registrycluster.local" - prototype.Source.RequestID = "asdfasdf" + prototype.Actor.RequestID = "asdfasdf" + prototype.Actor.Host = "registrycluster.local" + prototype.Source.Addr = "hostname.local:port" var manifestPush Event manifestPush = prototype diff --git a/storage/notifications/http.go b/storage/notifications/http.go new file mode 100644 index 000000000..15b3574cf --- /dev/null +++ b/storage/notifications/http.go @@ -0,0 +1,145 @@ +package notifications + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" +) + +// httpSink implements a single-flight, http notification endpoint. This is +// very lightweight in that it only makes an attempt at an http request. +// Reliability should be provided by the caller. +type httpSink struct { + url string + + mu sync.Mutex + closed bool + client *http.Client + listeners []httpStatusListener + + // TODO(stevvooe): Allow one to configure the media type accepted by this + // sink and choose the serialization based on that. +} + +// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other +// sinks for increased reliability. +func newHTTPSink(u string, timeout time.Duration, headers http.Header, listeners ...httpStatusListener) *httpSink { + return &httpSink{ + url: u, + listeners: listeners, + client: &http.Client{ + Transport: &headerRoundTripper{ + Transport: http.DefaultTransport.(*http.Transport), + headers: headers, + }, + Timeout: timeout, + }, + } +} + +// httpStatusListener is called on various outcomes of sending notifications. +type httpStatusListener interface { + success(status int, events ...Event) + failure(status int, events ...Event) + err(err error, events ...Event) +} + +// Accept makes an attempt to notify the endpoint, returning an error if it +// fails. It is the caller's responsibility to retry on error. The events are +// accepted or rejected as a group. +func (hs *httpSink) Write(events ...Event) error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return ErrSinkClosed + } + + envelope := Envelope{ + Events: events, + } + + // TODO(stevvooe): It is not ideal to keep re-encoding the request body on + // retry but we are going to do it to keep the code simple. It is likely + // we could change the event struct to manage its own buffer. + + p, err := json.MarshalIndent(envelope, "", " ") + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err) + } + + body := bytes.NewReader(p) + resp, err := hs.client.Post(hs.url, EventsMediaType, body) + if err != nil { + for _, listener := range hs.listeners { + listener.err(err, events...) + } + + return fmt.Errorf("%v: error posting: %v", hs, err) + } + + // The notifier will treat any 2xx or 3xx response as accepted by the + // endpoint. + switch { + case resp.StatusCode >= 200 && resp.StatusCode < 400: + for _, listener := range hs.listeners { + listener.success(resp.StatusCode, events...) + } + + // TODO(stevvooe): This is a little accepting: we may want to support + // unsupported media type responses with retries using the correct + // media type. There may also be cases that will never work. + + return nil + default: + for _, listener := range hs.listeners { + listener.failure(resp.StatusCode, events...) + } + return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status) + } +} + +// Close the endpoint +func (hs *httpSink) Close() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + if hs.closed { + return fmt.Errorf("httpsink: already closed") + } + + hs.closed = true + return nil +} + +func (hs *httpSink) String() string { + return fmt.Sprintf("httpSink{%s}", hs.url) +} + +type headerRoundTripper struct { + *http.Transport // must be transport to support CancelRequest + headers http.Header +} + +func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + var nreq http.Request + nreq = *req + nreq.Header = make(http.Header) + + merge := func(headers http.Header) { + for k, v := range headers { + nreq.Header[k] = append(nreq.Header[k], v...) + } + } + + merge(req.Header) + merge(hrt.headers) + + return hrt.Transport.RoundTrip(&nreq) +} diff --git a/storage/notifications/http_test.go b/storage/notifications/http_test.go new file mode 100644 index 000000000..c2cfbc02c --- /dev/null +++ b/storage/notifications/http_test.go @@ -0,0 +1,155 @@ +package notifications + +import ( + "encoding/json" + "fmt" + "mime" + "net/http" + "net/http/httptest" + "reflect" + "strconv" + "testing" +) + +// TestHTTPSink mocks out an http endpoint and notifies it under a couple of +// conditions, ensuring correct behavior. +func TestHTTPSink(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + t.Fatalf("unexpected request method: %v", r.Method) + return + } + + // Extract the content type and make sure it matches + contentType := r.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(contentType) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error parsing media type: %v, contenttype=%q", err, contentType) + return + } + + if mediaType != EventsMediaType { + w.WriteHeader(http.StatusUnsupportedMediaType) + t.Fatalf("incorrect media type: %q != %q", mediaType, EventsMediaType) + return + } + + var envelope Envelope + dec := json.NewDecoder(r.Body) + if err := dec.Decode(&envelope); err != nil { + w.WriteHeader(http.StatusBadRequest) + t.Fatalf("error decoding request body: %v", err) + return + } + + // Let caller choose the status + status, err := strconv.Atoi(r.FormValue("status")) + if err != nil { + t.Logf("error parsing status: %v", err) + + // May just be empty, set status to 200 + status = http.StatusOK + } + + w.WriteHeader(status) + })) + + metrics := newSafeMetrics() + sink := newHTTPSink(server.URL, 0, nil, + &endpointMetricsHTTPStatusListener{safeMetrics: metrics}) + + var expectedMetrics EndpointMetrics + expectedMetrics.Statuses = make(map[string]int) + + for _, tc := range []struct { + events []Event // events to send + url string + failure bool // true if there should be a failure. + statusCode int // if not set, no status code should be incremented. + }{ + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest")}, + }, + { + statusCode: http.StatusOK, + events: []Event{ + createTestEvent("push", "library/test", "manifest"), + createTestEvent("push", "library/test", "layer"), + createTestEvent("push", "library/test", "layer"), + }, + }, + { + statusCode: http.StatusTemporaryRedirect, + }, + { + statusCode: http.StatusBadRequest, + failure: true, + }, + { + // Case where connection never goes through. + url: "http://shoudlntresolve/", + failure: true, + }, + } { + + if tc.failure { + expectedMetrics.Failures += len(tc.events) + } else { + expectedMetrics.Successes += len(tc.events) + } + + if tc.statusCode > 0 { + expectedMetrics.Statuses[fmt.Sprintf("%d %s", tc.statusCode, http.StatusText(tc.statusCode))] += len(tc.events) + } + + url := tc.url + if url == "" { + url = server.URL + "/" + } + // setup endpoint to respond with expected status code. + url += fmt.Sprintf("?status=%v", tc.statusCode) + sink.url = url + + t.Logf("testcase: %v, fail=%v", url, tc.failure) + // Try a simple event emission. + err := sink.Write(tc.events...) + + if !tc.failure { + if err != nil { + t.Fatalf("unexpected error send event: %v", err) + } + } else { + if err == nil { + t.Fatalf("the endpoint should have rejected the request") + } + } + + if !reflect.DeepEqual(metrics.EndpointMetrics, expectedMetrics) { + t.Fatalf("metrics not as expected: %#v != %#v", metrics.EndpointMetrics, expectedMetrics) + } + } + + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing http sink: %v", err) + } + + // double close returns error + if err := sink.Close(); err == nil { + t.Fatalf("second close should have returned error: %v", err) + } + +} + +func createTestEvent(action, repo, typ string) Event { + event := createEvent(action) + + event.Target.Type = typ + event.Target.Name = repo + + return *event +} diff --git a/storage/notifications/listener.go b/storage/notifications/listener.go new file mode 100644 index 000000000..2d7bb112b --- /dev/null +++ b/storage/notifications/listener.go @@ -0,0 +1,140 @@ +package notifications + +import ( + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" +) + +// ManifestListener describes a set of methods for listening to events related to manifests. +type ManifestListener interface { + ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error + ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error +} + +// LayerListener describes a listener that can respond to layer related events. +type LayerListener interface { + LayerPushed(repo storage.Repository, layer storage.Layer) error + LayerPulled(repo storage.Repository, layer storage.Layer) error + + // TODO(stevvooe): Please note that delete support is still a little shaky + // and we'll need to propagate these in the future. + + LayerDeleted(repo storage.Repository, layer storage.Layer) error +} + +// Listener combines all repository events into a single interface. +type Listener interface { + ManifestListener + LayerListener +} + +type repositoryListener struct { + storage.Repository + listener Listener +} + +// Listen dispatches events on the repository to the listener. +func Listen(repo storage.Repository, listener Listener) storage.Repository { + return &repositoryListener{ + Repository: repo, + listener: listener, + } +} + +func (rl *repositoryListener) Manifests() storage.ManifestService { + return &manifestServiceListener{ + ManifestService: rl.Repository.Manifests(), + parent: rl, + } +} + +func (rl *repositoryListener) Layers() storage.LayerService { + return &layerServiceListener{ + LayerService: rl.Repository.Layers(), + parent: rl, + } +} + +type manifestServiceListener struct { + storage.ManifestService + parent *repositoryListener +} + +func (msl *manifestServiceListener) Get(tag string) (*manifest.SignedManifest, error) { + sm, err := msl.ManifestService.Get(tag) + if err == nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest pull to listener: %v", err) + } + } + + return sm, err +} + +func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest) error { + err := msl.ManifestService.Put(tag, sm) + + if err == nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + logrus.Errorf("error dispatching manifest push to listener: %v", err) + } + } + + return err +} + +type layerServiceListener struct { + storage.LayerService + parent *repositoryListener +} + +func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { + layer, err := lsl.LayerService.Fetch(dgst) + if err == nil { + if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer pull to listener: %v", err) + } + } + + return layer, err +} + +func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Upload() + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { + lu, err := lsl.LayerService.Resume(uuid) + return lsl.decorateUpload(lu), err +} + +func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { + return &layerUploadListener{ + LayerUpload: lu, + parent: lsl, + } +} + +type layerUploadListener struct { + storage.LayerUpload + parent *layerServiceListener +} + +func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { + layer, err := lul.LayerUpload.Finish(dgst) + if err == nil { + if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { + logrus.Errorf("error dispatching layer push to listener: %v", err) + } + } + + return layer, err +} diff --git a/storage/notifications/listener_test.go b/storage/notifications/listener_test.go new file mode 100644 index 000000000..17af8b158 --- /dev/null +++ b/storage/notifications/listener_test.go @@ -0,0 +1,151 @@ +package notifications + +import ( + "io" + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest" + "github.com/docker/distribution/storage" + "github.com/docker/distribution/storagedriver/inmemory" + "github.com/docker/distribution/testutil" + "github.com/docker/libtrust" +) + +func TestListener(t *testing.T) { + registry := storage.NewRegistryWithDriver(inmemory.New()) + tl := &testListener{ + ops: make(map[string]int), + } + repository := Listen(registry.Repository("foo/bar"), tl) + + // Now take the registry through a number of operations + checkExerciseRepository(t, repository) + + expectedOps := map[string]int{ + "manifest:push": 1, + "manifest:pull": 1, + // "manifest:delete": 0, // deletes not supported for now + "layer:push": 2, + "layer:pull": 2, + // "layer:delete": 0, // deletes not supported for now + } + + if !reflect.DeepEqual(tl.ops, expectedOps) { + t.Fatalf("counts do not match:\n%v\n !=\n%v", tl.ops, expectedOps) + } + +} + +type testListener struct { + ops map[string]int +} + +func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:push"]++ + + return nil +} + +func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:pull"]++ + return nil +} + +func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { + tl.ops["manifest:delete"]++ + return nil +} + +func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:push"]++ + return nil +} + +func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:pull"]++ + return nil +} + +func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { + tl.ops["layer:delete"]++ + return nil +} + +// checkExerciseRegistry takes the registry through all of its operations, +// carrying out generic checks. +func checkExerciseRepository(t *testing.T, repository storage.Repository) { + // TODO(stevvooe): This would be a nice testutil function. Basically, it + // takes the registry through a common set of operations. This could be + // used to make cross-cutting updates by changing internals that affect + // update counts. Basically, it would make writing tests a lot easier. + + tag := "thetag" + m := manifest.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 1, + }, + Name: repository.Name(), + Tag: tag, + } + + layers := repository.Layers() + for i := 0; i < 2; i++ { + rs, ds, err := testutil.CreateRandomTarFile() + if err != nil { + t.Fatalf("error creating test layer: %v", err) + } + dgst := digest.Digest(ds) + upload, err := layers.Upload() + if err != nil { + t.Fatalf("error creating layer upload: %v", err) + } + + // Use the resumes, as well! + upload, err = layers.Resume(upload.UUID()) + if err != nil { + t.Fatalf("error resuming layer upload: %v", err) + } + + io.Copy(upload, rs) + + if _, err := upload.Finish(dgst); err != nil { + t.Fatalf("unexpected error finishing upload: %v", err) + } + + m.FSLayers = append(m.FSLayers, manifest.FSLayer{ + BlobSum: dgst, + }) + + // Then fetch the layers + if _, err := layers.Fetch(dgst); err != nil { + t.Fatalf("error fetching layer: %v", err) + } + } + + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("unexpected error generating key: %v", err) + } + + sm, err := manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("unexpected error signing manifest: %v", err) + } + + manifests := repository.Manifests() + + if err := manifests.Put(tag, sm); err != nil { + t.Fatalf("unexpected error putting the manifest: %v", err) + } + + fetched, err := manifests.Get(tag) + if err != nil { + t.Fatalf("unexpected error fetching manifest: %v", err) + } + + if fetched.Tag != fetched.Tag { + t.Fatalf("retrieved unexpected manifest: %v", err) + } +} diff --git a/storage/notifications/metrics.go b/storage/notifications/metrics.go new file mode 100644 index 000000000..2a8ffcbd2 --- /dev/null +++ b/storage/notifications/metrics.go @@ -0,0 +1,152 @@ +package notifications + +import ( + "expvar" + "fmt" + "net/http" + "sync" +) + +// EndpointMetrics track various actions taken by the endpoint, typically by +// number of events. The goal of this to export it via expvar but we may find +// some other future solution to be better. +type EndpointMetrics struct { + Pending int // events pending in queue + Events int // total events incoming + Successes int // total events written successfully + Failures int // total events failed + Errors int // total events errored + Statuses map[string]int // status code histogram, per call event +} + +// safeMetrics guards the metrics implementation with a lock and provides a +// safe update function. +type safeMetrics struct { + EndpointMetrics + sync.Mutex // protects statuses map +} + +// newSafeMetrics returns safeMetrics with map allocated. +func newSafeMetrics() *safeMetrics { + var sm safeMetrics + sm.Statuses = make(map[string]int) + return &sm +} + +// httpStatusListener returns the listener for the http sink that updates the +// relevent counters. +func (sm *safeMetrics) httpStatusListener() httpStatusListener { + return &endpointMetricsHTTPStatusListener{ + safeMetrics: sm, + } +} + +// eventQueueListener returns a listener that maintains queue related counters. +func (sm *safeMetrics) eventQueueListener() eventQueueListener { + return &endpointMetricsEventQueueListener{ + safeMetrics: sm, + } +} + +// endpointMetricsHTTPStatusListener increments counters related to http sinks +// for the relevent events. +type endpointMetricsHTTPStatusListener struct { + *safeMetrics +} + +var _ httpStatusListener = &endpointMetricsHTTPStatusListener{} + +func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Successes += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events) + emsl.Failures += len(events) +} + +func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) { + emsl.safeMetrics.Lock() + defer emsl.safeMetrics.Unlock() + emsl.Errors += len(events) +} + +// endpointMetricsEventQueueListener maintains the incoming events counter and +// the queues pending count. +type endpointMetricsEventQueueListener struct { + *safeMetrics +} + +func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Events += len(events) + eqc.Pending += len(events) +} + +func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) { + eqc.Lock() + defer eqc.Unlock() + eqc.Pending -= len(events) +} + +// endpoints is global registry of endpoints used to report metrics to expvar +var endpoints struct { + registered []*Endpoint + mu sync.Mutex +} + +// register places the endpoint into expvar so that stats are tracked. +func register(e *Endpoint) { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + endpoints.registered = append(endpoints.registered, e) +} + +func init() { + // NOTE(stevvooe): Setup registry metrics structure to report to expvar. + // Ideally, we do more metrics through logging but we need some nice + // realtime metrics for queue state for now. + + registry := expvar.Get("registry") + + if registry == nil { + registry = expvar.NewMap("registry") + } + + var notifications expvar.Map + notifications.Init() + notifications.Set("endpoints", expvar.Func(func() interface{} { + endpoints.mu.Lock() + defer endpoints.mu.Unlock() + + var names []interface{} + for _, v := range endpoints.registered { + var epjson struct { + Name string `json:"name"` + URL string `json:"url"` + EndpointConfig + + Metrics EndpointMetrics + } + + epjson.Name = v.Name() + epjson.URL = v.URL() + epjson.EndpointConfig = v.EndpointConfig + + v.ReadMetrics(&epjson.Metrics) + + names = append(names, epjson) + } + + return names + })) + + registry.(*expvar.Map).Set("notifications", ¬ifications) +} diff --git a/storage/notifications/sinks.go b/storage/notifications/sinks.go new file mode 100644 index 000000000..2bf63e2d3 --- /dev/null +++ b/storage/notifications/sinks.go @@ -0,0 +1,337 @@ +package notifications + +import ( + "container/list" + "fmt" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// NOTE(stevvooe): This file contains definitions for several utility sinks. +// Typically, the broadcaster is the only sink that should be required +// externally, but others are suitable for export if the need arises. Albeit, +// the tight integration with endpoint metrics should be removed. + +// Broadcaster sends events to multiple, reliable Sinks. The goal of this +// component is to dispatch events to configured endpoints. Reliability can be +// provided by wrapping incoming sinks. +type Broadcaster struct { + sinks []Sink + events chan []Event + closed chan chan struct{} +} + +// NewBroadcaster ... +// Add appends one or more sinks to the list of sinks. The broadcaster +// behavior will be affected by the properties of the sink. Generally, the +// sink should accept all messages and deal with reliability on its own. Use +// of EventQueue and RetryingSink should be used here. +func NewBroadcaster(sinks ...Sink) *Broadcaster { + b := Broadcaster{ + sinks: sinks, + events: make(chan []Event), + closed: make(chan chan struct{}), + } + + // Start the broadcaster + go b.run() + + return &b +} + +// Write accepts a block of events to be dispatched to all sinks. This method +// will never fail and should never block (hopefully!). The caller cedes the +// slice memory to the broadcaster and should not modify it after calling +// write. +func (b *Broadcaster) Write(events ...Event) error { + select { + case b.events <- events: + case <-b.closed: + return ErrSinkClosed + } + return nil +} + +// Close the broadcaster, ensuring that all messages are flushed to the +// underlying sink before returning. +func (b *Broadcaster) Close() error { + logrus.Infof("broadcaster: closing") + select { + case <-b.closed: + // already closed + return fmt.Errorf("broadcaster: already closed") + default: + // do a little chan handoff dance to synchronize closing + closed := make(chan struct{}) + b.closed <- closed + close(b.closed) + <-closed + return nil + } +} + +// run is the main broadcast loop, started when the broadcaster is created. +// Under normal conditions, it waits for events on the event channel. After +// Close is called, this goroutine will exit. +func (b *Broadcaster) run() { + for { + select { + case block := <-b.events: + for _, sink := range b.sinks { + if err := sink.Write(block...); err != nil { + logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err) + } + } + case closing := <-b.closed: + + // close all the underlying sinks + for _, sink := range b.sinks { + if err := sink.Close(); err != nil { + logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err) + } + } + closing <- struct{}{} + + logrus.Debugf("broadcaster: closed") + return + } + } +} + +// eventQueue accepts all messages into a queue for asynchronous consumption +// by a sink. It is unbounded and thread safe but the sink must be reliable or +// events will be dropped. +type eventQueue struct { + sink Sink + events *list.List + listeners []eventQueueListener + cond *sync.Cond + mu sync.Mutex + closed bool +} + +// eventQueueListener is called when various events happen on the queue. +type eventQueueListener interface { + ingress(events ...Event) + egress(events ...Event) +} + +// newEventQueue returns a queue to the provided sink. If the updater is non- +// nil, it will be called to update pending metrics on ingress and egress. +func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue { + eq := eventQueue{ + sink: sink, + events: list.New(), + listeners: listeners, + } + + eq.cond = sync.NewCond(&eq.mu) + go eq.run() + return &eq +} + +// Write accepts the events into the queue, only failing if the queue has +// beend closed. +func (eq *eventQueue) Write(events ...Event) error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return ErrSinkClosed + } + + for _, listener := range eq.listeners { + listener.ingress(events...) + } + eq.events.PushBack(events) + eq.cond.Signal() // signal waiters + + return nil +} + +// Close shutsdown the event queue, flushing +func (eq *eventQueue) Close() error { + eq.mu.Lock() + defer eq.mu.Unlock() + + if eq.closed { + return fmt.Errorf("eventqueue: already closed") + } + + // set closed flag + eq.closed = true + eq.cond.Signal() // signal flushes queue + eq.cond.Wait() // wait for signal from last flush + + return eq.sink.Close() +} + +// run is the main goroutine to flush events to the target sink. +func (eq *eventQueue) run() { + for { + block := eq.next() + + if block == nil { + return // nil block means event queue is closed. + } + + if err := eq.sink.Write(block...); err != nil { + logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err) + } + + for _, listener := range eq.listeners { + listener.egress(block...) + } + } +} + +// next encompasses the critical section of the run loop. When the queue is +// empty, it will block on the condition. If new data arrives, it will wake +// and return a block. When closed, a nil slice will be returned. +func (eq *eventQueue) next() []Event { + eq.mu.Lock() + defer eq.mu.Unlock() + + for eq.events.Len() < 1 { + if eq.closed { + eq.cond.Broadcast() + return nil + } + + eq.cond.Wait() + } + + front := eq.events.Front() + block := front.Value.([]Event) + eq.events.Remove(front) + + return block +} + +// retryingSink retries the write until success or an ErrSinkClosed is +// returned. Underlying sink must have p > 0 of succeeding or the sink will +// block. Internally, it is a circuit breaker retries to manage reset. +// Concurrent calls to a retrying sink are serialized through the sink, +// meaning that if one is in-flight, another will not proceed. +type retryingSink struct { + mu sync.Mutex + sink Sink + closed bool + + // circuit breaker hueristics + failures struct { + threshold int + recent int + last time.Time + backoff time.Duration // time after which we retry after failure. + } +} + +type retryingSinkListener interface { + active(events ...Event) + retry(events ...Event) +} + +// TODO(stevvooe): We are using circuit break here, which actually doesn't +// make a whole lot of sense for this use case, since we always retry. Move +// this to use bounded exponential backoff. + +// newRetryingSink returns a sink that will retry writes to a sink, backing +// off on failure. Parameters threshold and backoff adjust the behavior of the +// circuit breaker. +func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink { + rs := &retryingSink{ + sink: sink, + } + rs.failures.threshold = threshold + rs.failures.backoff = backoff + + return rs +} + +// Write attempts to flush the events to the downstream sink until it succeeds +// or the sink is closed. +func (rs *retryingSink) Write(events ...Event) error { + rs.mu.Lock() + defer rs.mu.Unlock() + +retry: + + if rs.closed { + return ErrSinkClosed + } + + if !rs.proceed() { + logrus.Warnf("%v encountered too many errors, backing off", rs.sink) + rs.wait(rs.failures.backoff) + goto retry + } + + if err := rs.write(events...); err != nil { + if err == ErrSinkClosed { + // terminal! + return err + } + + logrus.Errorf("retryingsink: error writing events: %v, retrying", err) + goto retry + } + + return nil +} + +// Close closes the sink and the underlying sink. +func (rs *retryingSink) Close() error { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.closed { + return fmt.Errorf("retryingsink: already closed") + } + + rs.closed = true + return rs.sink.Close() +} + +// write provides a helper that dispatches failure and success properly. Used +// by write as the single-flight write call. +func (rs *retryingSink) write(events ...Event) error { + if err := rs.sink.Write(events...); err != nil { + rs.failure() + return err + } + + rs.reset() + return nil +} + +// wait backoff time against the sink, unlocking so others can proceed. Should +// only be called by methods that currently have the mutex. +func (rs *retryingSink) wait(backoff time.Duration) { + rs.mu.Unlock() + defer rs.mu.Lock() + + // backoff here + time.Sleep(backoff) +} + +// reset marks a succesful call. +func (rs *retryingSink) reset() { + rs.failures.recent = 0 + rs.failures.last = time.Time{} +} + +// failure records a failure. +func (rs *retryingSink) failure() { + rs.failures.recent++ + rs.failures.last = time.Now().UTC() +} + +// proceed returns true if the call should proceed based on circuit breaker +// hueristics. +func (rs *retryingSink) proceed() bool { + return rs.failures.recent < rs.failures.threshold || + time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff)) +} diff --git a/storage/notifications/sinks_test.go b/storage/notifications/sinks_test.go new file mode 100644 index 000000000..89756a999 --- /dev/null +++ b/storage/notifications/sinks_test.go @@ -0,0 +1,223 @@ +package notifications + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/Sirupsen/logrus" + + "testing" +) + +func TestBroadcaster(t *testing.T) { + const nEvents = 1000 + var sinks []Sink + + for i := 0; i < 10; i++ { + sinks = append(sinks, &testSink{}) + } + + b := NewBroadcaster(sinks...) + + var block []Event + var wg sync.WaitGroup + for i := 1; i <= nEvents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := b.Write(block...); err != nil { + t.Fatalf("error writing block of length %d: %v", len(block), err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() // Wait until writes complete + checkClose(t, b) + + // Iterate through the sinks and check that they all have the expected length. + for _, sink := range sinks { + ts := sink.(*testSink) + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != nEvents { + t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + } + +} + +func TestEventQueue(t *testing.T) { + const nevents = 1000 + var ts testSink + metrics := newSafeMetrics() + eq := newEventQueue( + // delayed sync simulates destination slower than channel comms + &delayedSink{ + Sink: &ts, + delay: time.Millisecond * 1, + }, metrics.eventQueueListener()) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= nevents; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + if err := eq.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + wg.Done() + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, eq) + + ts.mu.Lock() + defer ts.mu.Unlock() + metrics.Lock() + defer metrics.Unlock() + + if len(ts.events) != nevents { + t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000) + } + + if !ts.closed { + t.Fatalf("sink should have been closed") + } + + if metrics.Events != nevents { + t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents) + } + + if metrics.Pending != 0 { + t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0) + } +} + +func TestRetryingSink(t *testing.T) { + + // Make a sync that fails most of the time, ensuring that all the events + // make it through. + var ts testSink + flaky := &flakySink{ + rate: 1.0, // start out always failing. + Sink: &ts, + } + s := newRetryingSink(flaky, 3, 10*time.Millisecond) + + var wg sync.WaitGroup + var block []Event + for i := 1; i <= 100; i++ { + block = append(block, createTestEvent("push", "library/test", "blob")) + + // Above 50, set the failure rate lower + if i > 50 { + s.mu.Lock() + flaky.rate = 0.90 + s.mu.Unlock() + } + + if i%10 == 0 && i > 0 { + wg.Add(1) + go func(block ...Event) { + defer wg.Done() + if err := s.Write(block...); err != nil { + t.Fatalf("error writing event block: %v", err) + } + }(block...) + + block = nil + } + } + + wg.Wait() + checkClose(t, s) + + ts.mu.Lock() + defer ts.mu.Unlock() + + if len(ts.events) != 100 { + t.Fatalf("events not propagated: %d != %d", len(ts.events), 100) + } +} + +type testSink struct { + events []Event + mu sync.Mutex + closed bool +} + +func (ts *testSink) Write(events ...Event) error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.events = append(ts.events, events...) + return nil +} + +func (ts *testSink) Close() error { + ts.mu.Lock() + defer ts.mu.Unlock() + ts.closed = true + + logrus.Infof("closing testSink") + return nil +} + +type delayedSink struct { + Sink + delay time.Duration +} + +func (ds *delayedSink) Write(events ...Event) error { + time.Sleep(ds.delay) + return ds.Sink.Write(events...) +} + +type flakySink struct { + Sink + rate float64 +} + +func (fs *flakySink) Write(events ...Event) error { + if rand.Float64() < fs.rate { + return fmt.Errorf("error writing %d events", len(events)) + } + + return fs.Sink.Write(events...) +} + +func checkClose(t *testing.T, sink Sink) { + if err := sink.Close(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // second close should not crash but should return an error. + if err := sink.Close(); err == nil { + t.Fatalf("no error on double close") + } + + // Write after closed should be an error + if err := sink.Write([]Event{}...); err == nil { + t.Fatalf("write after closed did not have an error") + } else if err != ErrSinkClosed { + t.Fatalf("error should be ErrSinkClosed") + } +}