diff --git a/notifications/bridge.go b/notifications/bridge.go index 3f7f8c14b..fff45a4fc 100644 --- a/notifications/bridge.go +++ b/notifications/bridge.go @@ -53,31 +53,31 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord { } } -func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestPushed(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionPush, repo, sm) } -func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestPulled(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionPull, repo, sm) } -func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { return b.createManifestEventAndWrite(EventActionDelete, repo, sm) } -func (b *bridge) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobPushed(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionPush, repo, desc) } -func (b *bridge) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionPull, repo, desc) } -func (b *bridge) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error { return b.createBlobEventAndWrite(EventActionDelete, repo, desc) } -func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error { +func (b *bridge) createManifestEventAndWrite(action string, repo string, sm *manifest.SignedManifest) error { manifestEvent, err := b.createManifestEvent(action, repo, sm) if err != nil { return err @@ -86,10 +86,10 @@ func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Re return b.sink.Write(*manifestEvent) } -func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) { +func (b *bridge) createManifestEvent(action string, repo string, sm *manifest.SignedManifest) (*Event, error) { event := b.createEvent(action) event.Target.MediaType = manifest.ManifestMediaType - event.Target.Repository = repo.Name() + event.Target.Repository = repo p, err := sm.Payload() if err != nil { @@ -97,15 +97,12 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository } event.Target.Length = int64(len(p)) - event.Target.Digest, err = digest.FromBytes(p) if err != nil { return nil, err } - // TODO(stevvooe): Currently, the is the "tag" url: once the digest url is - // implemented, this should be replaced. - event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag) + event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, event.Target.Digest.String()) if err != nil { return nil, err } @@ -113,7 +110,7 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository return event, nil } -func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Repository, desc distribution.Descriptor) error { +func (b *bridge) createBlobEventAndWrite(action string, repo string, desc distribution.Descriptor) error { event, err := b.createBlobEvent(action, repo, desc) if err != nil { return err @@ -122,13 +119,13 @@ func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Reposi return b.sink.Write(*event) } -func (b *bridge) createBlobEvent(action string, repo distribution.Repository, desc distribution.Descriptor) (*Event, error) { +func (b *bridge) createBlobEvent(action string, repo string, desc distribution.Descriptor) (*Event, error) { event := b.createEvent(action) event.Target.Descriptor = desc - event.Target.Repository = repo.Name() + event.Target.Repository = repo var err error - event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), desc.Digest) + event.Target.URL, err = b.ub.BuildBlobURL(repo, desc.Digest) if err != nil { return nil, err } diff --git a/notifications/bridge_test.go b/notifications/bridge_test.go new file mode 100644 index 000000000..fbf557d8a --- /dev/null +++ b/notifications/bridge_test.go @@ -0,0 +1,166 @@ +package notifications + +import ( + "testing" + + "github.com/docker/distribution/digest" + + "github.com/docker/libtrust" + + "github.com/docker/distribution/manifest" + + "github.com/docker/distribution/registry/api/v2" + "github.com/docker/distribution/uuid" +) + +var ( + // common environment for expected manifest events. + + repo = "test/repo" + source = SourceRecord{ + Addr: "remote.test", + InstanceID: uuid.Generate().String(), + } + ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/")) + + actor = ActorRecord{ + Name: "test", + } + request = RequestRecord{} + m = manifest.Manifest{ + Name: repo, + Tag: "latest", + } + + sm *manifest.SignedManifest + payload []byte + dgst digest.Digest +) + +func TestEventBridgeManifestPulled(t *testing.T) { + + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionPull, events...) + + return nil + })) + + if err := l.ManifestPulled(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func TestEventBridgeManifestPushed(t *testing.T) { + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionPush, events...) + + return nil + })) + + if err := l.ManifestPushed(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func TestEventBridgeManifestDeleted(t *testing.T) { + l := createTestEnv(t, testSinkFn(func(events ...Event) error { + checkCommonManifest(t, EventActionDelete, events...) + + return nil + })) + + if err := l.ManifestDeleted(repo, sm); err != nil { + t.Fatalf("unexpected error notifying manifest pull: %v", err) + } +} + +func createTestEnv(t *testing.T, fn testSinkFn) Listener { + pk, err := libtrust.GenerateECP256PrivateKey() + if err != nil { + t.Fatalf("error generating private key: %v", err) + } + + sm, err = manifest.Sign(&m, pk) + if err != nil { + t.Fatalf("error signing manifest: %v", err) + } + + payload, err = sm.Payload() + if err != nil { + t.Fatalf("error getting manifest payload: %v", err) + } + + dgst, err = digest.FromBytes(payload) + if err != nil { + t.Fatalf("error digesting manifest payload: %v", err) + } + + return NewBridge(ub, source, actor, request, fn) +} + +func checkCommonManifest(t *testing.T, action string, events ...Event) { + checkCommon(t, events...) + + event := events[0] + if event.Action != action { + t.Fatalf("unexpected event action: %q != %q", event.Action, action) + } + + u, err := ub.BuildManifestURL(repo, dgst.String()) + if err != nil { + t.Fatalf("error building expected url: %v", err) + } + + if event.Target.URL != u { + t.Fatalf("incorrect url passed: %q != %q", event.Target.URL, u) + } +} + +func checkCommon(t *testing.T, events ...Event) { + if len(events) != 1 { + t.Fatalf("unexpected number of events: %v != 1", len(events)) + } + + event := events[0] + + if event.Source != source { + t.Fatalf("source not equal: %#v != %#v", event.Source, source) + } + + if event.Request != request { + t.Fatalf("request not equal: %#v != %#v", event.Request, request) + } + + if event.Actor != actor { + t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) + } + + if event.Target.Digest != dgst { + t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst) + } + + if event.Target.Length != int64(len(payload)) { + t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload)) + } + + if event.Target.Repository != repo { + t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) + } + +} + +type testSinkFn func(events ...Event) error + +func (tsf testSinkFn) Write(events ...Event) error { + return tsf(events...) +} + +func (tsf testSinkFn) Close() error { return nil } + +func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder { + if err != nil { + panic(err) + } + + return ub +} diff --git a/notifications/listener.go b/notifications/listener.go index 5d83af5b5..9b2762cdd 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -12,24 +12,24 @@ import ( // ManifestListener describes a set of methods for listening to events related to manifests. type ManifestListener interface { - ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error - ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error + ManifestPushed(repo string, sm *manifest.SignedManifest) error + ManifestPulled(repo string, sm *manifest.SignedManifest) error // TODO(stevvooe): Please note that delete support is still a little shaky // and we'll need to propagate these in the future. - ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error + ManifestDeleted(repo string, sm *manifest.SignedManifest) error } // BlobListener describes a listener that can respond to layer related events. type BlobListener interface { - BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error - BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error + BlobPushed(repo string, desc distribution.Descriptor) error + BlobPulled(repo string, desc distribution.Descriptor) error // TODO(stevvooe): Please note that delete support is still a little shaky // and we'll need to propagate these in the future. - BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error + BlobDeleted(repo string, desc distribution.Descriptor) error } // Listener combines all repository events into a single interface. @@ -73,7 +73,7 @@ type manifestServiceListener struct { func (msl *manifestServiceListener) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { sm, err := msl.ManifestService.Get(dgst) if err == nil { - if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest pull to listener: %v", err) } } @@ -85,7 +85,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { err := msl.ManifestService.Put(sm) if err == nil { - if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest push to listener: %v", err) } } @@ -96,7 +96,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error { func (msl *manifestServiceListener) GetByTag(tag string) (*manifest.SignedManifest, error) { sm, err := msl.ManifestService.GetByTag(tag) if err == nil { - if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { + if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil { logrus.Errorf("error dispatching manifest pull to listener: %v", err) } } @@ -117,7 +117,7 @@ func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([] if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -132,7 +132,7 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (d if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -147,7 +147,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr if desc, err := bsl.Stat(ctx, dgst); err != nil { context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { - if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -159,7 +159,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { desc, err := bsl.BlobStore.Put(ctx, mediaType, p) if err == nil { - if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository, desc); err != nil { + if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Name(), desc); err != nil { context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) } } @@ -192,7 +192,7 @@ type blobWriterListener struct { func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { committed, err := bwl.BlobWriter.Commit(ctx, desc) if err == nil { - if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository, committed); err != nil { + if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Name(), committed); err != nil { context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err) } } diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 641c4813d..5b0250b4a 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -51,33 +51,33 @@ type testListener struct { ops map[string]int } -func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestPushed(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:push"]++ return nil } -func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestPulled(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:pull"]++ return nil } -func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { +func (tl *testListener) ManifestDeleted(repo string, sm *manifest.SignedManifest) error { tl.ops["manifest:delete"]++ return nil } -func (tl *testListener) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobPushed(repo string, desc distribution.Descriptor) error { tl.ops["layer:push"]++ return nil } -func (tl *testListener) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) error { tl.ops["layer:pull"]++ return nil } -func (tl *testListener) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { +func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error { tl.ops["layer:delete"]++ return nil }