From a2d4f51aa41cdf43de5cdde1653e704725921e18 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Wed, 27 May 2015 12:23:49 -0700 Subject: [PATCH] Notification should send digest URL in event target Previously, the most accurate reference for a manifest was the tag url. After adding pull by digest, all event notifications should refer directly to the digest url. This ensures that event uniquely identifies the target of the notification. Testing has been added for manifest pull events to check that this doesn't change. In addition, the listener interface has been refactored to only use the repository name, rather than the full repository object. Signed-off-by: Stephen J Day --- notifications/bridge.go | 31 +++--- notifications/bridge_test.go | 166 +++++++++++++++++++++++++++++++++ notifications/listener.go | 28 +++--- notifications/listener_test.go | 12 +-- 4 files changed, 200 insertions(+), 37 deletions(-) create mode 100644 notifications/bridge_test.go diff --git a/notifications/bridge.go b/notifications/bridge.go index 3f7f8c14..fff45a4f 100644 --- a/notifications/bridge.go +++ b/notifications/bridge.go @@ -53,31 +53,31 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord { } } -func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestPushed(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionPush, repo, sm) } -func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestPulled(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionPull, repo, sm) } -func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionDelete, repo, sm) } -func (b *bridge) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobPushed(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionPush, repo, desc) } -func (b *bridge) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionPull, repo, desc) } -func (b *bridge) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionDelete, repo, desc) } -func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) createManifestEventAndWrite(action string, repo string, sm *manifest.SignedManifest) error { manifestEvent, err := b.createManifestEvent(action, repo, sm) if err != nil { return err @@ -86,10 +86,10 @@ func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Re return b.sink.Write(*manifestEvent) } -func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) { +func (b *bridge) createManifestEvent(action string, repo string, sm *manifest.SignedManifest) (*Event, error) { event := b.createEvent(action) event.Target.MediaType = manifest.ManifestMediaType - event.Target.Repository = repo.Name() + event.Target.Repository = repo p, err := sm.Payload() if err != nil { @@ -97,15 +97,12 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository } event.Target.Length = int64(len(p)) - event.Target.Digest, err = digest.FromBytes(p) if err != nil { return nil, err } - // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is - // implemented, this should be replaced. - event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, event.Target.Digest.String()) if err != nil { return nil, err } @@ -113,7 +110,7 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository return event, nil } -func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) createBlobEventAndWrite(action string, repo string, desc distribution.Descriptor) error { event, err := b.createBlobEvent(action, repo, desc) if err != nil { return err @@ -122,13 +119,13 @@ func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Reposi return b.sink.Write(*event) } -func (b *bridge) createBlobEvent(action string, repo distribution.Repository, desc distribution.Descriptor) (*Event, error) { +func (b *bridge) createBlobEvent(action string, repo string, desc distribution.Descriptor) (*Event, error) { event := b.createEvent(action) event.Target.Descriptor = desc - event.Target.Repository = repo.Name() + event.Target.Repository = repo var err error - event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), desc.Digest) + event.Target.URL, err = b.ub.BuildBlobURL(repo, desc.Digest) if err != nil { return nil, err } diff --git a/notifications/bridge_test.go b/notifications/bridge_test.go new file mode 100644 index 00000000..fbf557d8 --- /dev/null +++ b/notifications/bridge_test.go @@ -0,0 +1,166 @@ +package notifications + +import ( + "testing" + + "github.com/docker/distribution/digest" + + "github.com/docker/libtrust" + + "github.com/docker/distribution/manifest" + + "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/uuid" +) + +var ( + // common environment for expected manifest events. + + repo = "test/repo" + source = SourceRecord{ + Addr: "remote.test", + InstanceID: uuid.Generate().String(), + } + ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/")) + + actor = ActorRecord{ + Name: "test", + } + request = RequestRecord{} + m = manifest.Manifest{ + Name: repo, + Tag: "latest", + } + + sm *manifest.SignedManifest + payload []byte + dgst digest.Digest +) + +func TestEventBridgeManifestPulled(t *testing.T) { + + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionPull, events...) + + return nil + })) + + if err := l.ManifestPulled(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func TestEventBridgeManifestPushed(t *testing.T) { + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionPush, events...) + + return nil + })) + + if err := l.ManifestPushed(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func TestEventBridgeManifestDeleted(t *testing.T) { + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionDelete, events...) + + return nil + })) + + if err := l.ManifestDeleted(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func createTestEnv(t *testing.T, fn testSinkFn) Listener { + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("error generating private key: %v", err) + } + + sm, err = manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("error signing manifest: %v", err) + } + + payload, err = sm.Payload() + if err != nil { + t.Fatalf("error getting manifest payload: %v", err) + } + + dgst, err = digest.FromBytes(payload) + if err != nil { + t.Fatalf("error digesting manifest payload: %v", err) + } + + return NewBridge(ub, source, actor, request, fn) +} + +func checkCommonManifest(t *testing.T, action string, events ...Event) { + checkCommon(t, events...) + + event := events[0] + if event.Action != action { + t.Fatalf("unexpected event action: %q != %q", event.Action, action) + } + + u, err := ub.BuildManifestURL(repo, dgst.String()) + if err != nil { + t.Fatalf("error building expected url: %v", err) + } + + if event.Target.URL != u { + t.Fatalf("incorrect url passed: %q != %q", event.Target.URL, u) + } +} + +func checkCommon(t *testing.T, events ...Event) { + if len(events) != 1 { + t.Fatalf("unexpected number of events: %v != 1", len(events)) + } + + event := events[0] + + if event.Source != source { + t.Fatalf("source not equal: %#v != %#v", event.Source, source) + } + + if event.Request != request { + t.Fatalf("request not equal: %#v != %#v", event.Request, request) + } + + if event.Actor != actor { + t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) + } + + if event.Target.Digest != dgst { + t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst) + } + + if event.Target.Length != int64(len(payload)) { + t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload)) + } + + if event.Target.Repository != repo { + t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) + } + +} + +type testSinkFn func(events ...Event) error + +func (tsf testSinkFn) Write(events ...Event) error { + return tsf(events...) +} + +func (tsf testSinkFn) Close() error { return nil } + +func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder { + if err != nil { + panic(err) + } + + return ub +} diff --git a/notifications/listener.go b/notifications/listener.go index 5d83af5b..9b2762cd 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -12,24 +12,24 @@ import ( // ManifestListener describes a set of methods for listening to events related to manifests. type ManifestListener interface { - ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error - ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error + ManifestPushed(repo string, sm *manifest.SignedManifest) error + ManifestPulled(repo string, sm *manifest.SignedManifest) error // TODO(stevvooe): Please note that delete support is still a little shaky // and we'll need to propagate these in the future. - ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error + ManifestDeleted(repo string, sm *manifest.SignedManifest) error } // BlobListener describes a listener that can respond to layer related events. type BlobListener interface { - BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error - BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error + BlobPushed(repo string, desc distribution.Descriptor) error + BlobPulled(repo string, desc distribution.Descriptor) error // TODO(stevvooe): Please note that delete support is still a little shaky // and we'll need to propagate these in the future. - BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error + BlobDeleted(repo string, desc distribution.Descriptor) error } // Listener combines all repository events into a single interface. @@ -73,7 +73,7 @@ type manifestServiceListener struct { func (msl *manifestServiceListener) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { sm, err := msl.ManifestService.Get(dgst) if err == nil { - if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest pull to listener: %v", err) } } @@ -85,7 +85,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { err := msl.ManifestService.Put(sm) if err == nil { - if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest push to listener: %v", err) } } @@ -96,7 +96,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { func (msl *manifestServiceListener) GetByTag(tag string) (*manifest.SignedManifest, error) { sm, err := msl.ManifestService.GetByTag(tag) if err == nil { - if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest pull to listener: %v", err) } } @@ -117,7 +117,7 @@ func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([] if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -132,7 +132,7 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (d if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -147,7 +147,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -159,7 +159,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { desc, err := bsl.BlobStore.Put(ctx, mediaType, p) if err == nil { - if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -192,7 +192,7 @@ type blobWriterListener struct { func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { committed, err := bwl.BlobWriter.Commit(ctx, desc) if err == nil { - if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository, committed); err != nil { + if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Name(), committed); err != nil { context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err) } } diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 641c4813..5b0250b4 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -51,33 +51,33 @@ type testListener struct { ops map[string]int } -func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestPushed(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:push"]++ return nil } -func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestPulled(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:pull"]++ return nil } -func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:delete"]++ return nil } -func (tl *testListener) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobPushed(repo string, desc distribution.Descriptor) error { tl.ops["layer:push"]++ return nil } -func (tl *testListener) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) error { tl.ops["layer:pull"]++ return nil } -func (tl *testListener) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error { tl.ops["layer:delete"]++ return nil }