From f37b2ee16edacb923d1110e657f6e5004a8331dc Mon Sep 17 00:00:00 2001 From: Richard Scothern Date: Thu, 28 Jan 2016 09:56:37 -0800 Subject: [PATCH] Send manifest and blob delete events to the notifications subsystem. Signed-off-by: Richard Scothern --- notifications/bridge.go | 25 +++++++++++++++++++++---- notifications/bridge_test.go | 34 +++++++++++++++++++++++++++++++--- notifications/listener.go | 34 ++++++++++++++++++++++++---------- notifications/listener_test.go | 30 ++++++++++++++++++++++-------- 4 files changed, 98 insertions(+), 25 deletions(-) diff --git a/notifications/bridge.go b/notifications/bridge.go index 5b759bb4..584ba976 100644 --- a/notifications/bridge.go +++ b/notifications/bridge.go @@ -6,6 +6,7 @@ import ( "github.com/docker/distribution" "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" "github.com/docker/distribution/reference" "github.com/docker/distribution/uuid" ) @@ -60,8 +61,8 @@ func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest) return b.createManifestEventAndWrite(EventActionPull, repo, sm) } -func (b *bridge) ManifestDeleted(repo reference.Named, sm distribution.Manifest) error { - return b.createManifestEventAndWrite(EventActionDelete, repo, sm) +func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error { + return b.createManifestDeleteEventAndWrite(EventActionDelete, repo, dgst) } func (b *bridge) BlobPushed(repo reference.Named, desc distribution.Descriptor) error { @@ -81,8 +82,8 @@ func (b *bridge) BlobMounted(repo reference.Named, desc distribution.Descriptor, return b.sink.Write(*event) } -func (b *bridge) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error { - return b.createBlobEventAndWrite(EventActionDelete, repo, desc) +func (b *bridge) BlobDeleted(repo reference.Named, dgst digest.Digest) error { + return b.createBlobDeleteEventAndWrite(EventActionDelete, repo, dgst) } func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named, sm distribution.Manifest) error { @@ -94,6 +95,14 @@ func (b *bridge) createManifestEventAndWrite(action string, repo reference.Named return b.sink.Write(*manifestEvent) } +func (b *bridge) createManifestDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error { + event := b.createEvent(action) + event.Target.Repository = repo.Name() + event.Target.Digest = dgst + + return b.sink.Write(*event) +} + func (b *bridge) createManifestEvent(action string, repo reference.Named, sm distribution.Manifest) (*Event, error) { event := b.createEvent(action) event.Target.Repository = repo.Name() @@ -127,6 +136,14 @@ func (b *bridge) createManifestEvent(action string, repo reference.Named, sm dis return event, nil } +func (b *bridge) createBlobDeleteEventAndWrite(action string, repo reference.Named, dgst digest.Digest) error { + event := b.createEvent(action) + event.Target.Digest = dgst + event.Target.Repository = repo.Name() + + return b.sink.Write(*event) +} + func (b *bridge) createBlobEventAndWrite(action string, repo reference.Named, desc distribution.Descriptor) error { event, err := b.createBlobEvent(action, repo, desc) if err != nil { diff --git a/notifications/bridge_test.go b/notifications/bridge_test.go index 1a063c5f..6f361449 100644 --- a/notifications/bridge_test.go +++ b/notifications/bridge_test.go @@ -63,13 +63,12 @@ func TestEventBridgeManifestPushed(t *testing.T) { func TestEventBridgeManifestDeleted(t *testing.T) { l := createTestEnv(t, testSinkFn(func(events ...Event) error { - checkCommonManifest(t, EventActionDelete, events...) - + checkDeleted(t, EventActionDelete, events...) return nil })) repoRef, _ := reference.ParseNamed(repo) - if err := l.ManifestDeleted(repoRef, sm); err != nil { + if err := l.ManifestDeleted(repoRef, dgst); err != nil { t.Fatalf("unexpected error notifying manifest pull: %v", err) } } @@ -91,6 +90,35 @@ func createTestEnv(t *testing.T, fn testSinkFn) Listener { return NewBridge(ub, source, actor, request, fn) } +func checkDeleted(t *testing.T, action string, events ...Event) { + if len(events) != 1 { + t.Fatalf("unexpected number of events: %v != 1", len(events)) + } + + event := events[0] + + if event.Source != source { + t.Fatalf("source not equal: %#v != %#v", event.Source, source) + } + + if event.Request != request { + t.Fatalf("request not equal: %#v != %#v", event.Request, request) + } + + if event.Actor != actor { + t.Fatalf("request not equal: %#v != %#v", event.Actor, actor) + } + + if event.Target.Digest != dgst { + t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst) + } + + if event.Target.Repository != repo { + t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo) + } + +} + func checkCommonManifest(t *testing.T, action string, events ...Event) { checkCommon(t, events...) diff --git a/notifications/listener.go b/notifications/listener.go index 697b81cd..0817d463 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -14,11 +14,7 @@ import ( type ManifestListener interface { ManifestPushed(repo reference.Named, sm distribution.Manifest) error ManifestPulled(repo reference.Named, sm distribution.Manifest) error - - // TODO(stevvooe): Please note that delete support is still a little shaky - // and we'll need to propagate these in the future. - - ManifestDeleted(repo reference.Named, sm distribution.Manifest) error + ManifestDeleted(repo reference.Named, dgst digest.Digest) error } // BlobListener describes a listener that can respond to layer related events. @@ -26,11 +22,7 @@ type BlobListener interface { BlobPushed(repo reference.Named, desc distribution.Descriptor) error BlobPulled(repo reference.Named, desc distribution.Descriptor) error BlobMounted(repo reference.Named, desc distribution.Descriptor, fromRepo reference.Named) error - - // TODO(stevvooe): Please note that delete support is still a little shaky - // and we'll need to propagate these in the future. - - BlobDeleted(repo reference.Named, desc distribution.Descriptor) error + BlobDeleted(repo reference.Named, desc digest.Digest) error } // Listener combines all repository events into a single interface. @@ -75,6 +67,17 @@ type manifestServiceListener struct { parent *repositoryListener } +func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Digest) error { + err := msl.ManifestService.Delete(ctx, dgst) + if err == nil { + if err := msl.parent.listener.ManifestDeleted(msl.parent.Repository.Named(), dgst); err != nil { + logrus.Errorf("error dispatching manifest delete to listener: %v", err) + } + } + + return err +} + func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { sm, err := msl.ManifestService.Get(ctx, dgst) if err == nil { @@ -173,6 +176,17 @@ func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribut return bsl.decorateWriter(wr), err } +func (bsl *blobServiceListener) Delete(ctx context.Context, dgst digest.Digest) error { + err := bsl.BlobStore.Delete(ctx, dgst) + if err == nil { + if err := bsl.parent.listener.BlobDeleted(bsl.parent.Repository.Named(), dgst); err != nil { + context.GetLogger(ctx).Errorf("error dispatching layer delete to listener: %v", err) + } + } + + return err +} + func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { wr, err := bsl.BlobStore.Resume(ctx, id) return bsl.decorateWriter(wr), err diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 9f45921c..b979626b 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -39,12 +39,12 @@ func TestListener(t *testing.T) { checkExerciseRepository(t, repository) expectedOps := map[string]int{ - "manifest:push": 1, - "manifest:pull": 1, - // "manifest:delete": 0, // deletes not supported for now - "layer:push": 2, - "layer:pull": 2, - // "layer:delete": 0, // deletes not supported for now + "manifest:push": 1, + "manifest:pull": 1, + "manifest:delete": 1, + "layer:push": 2, + "layer:pull": 2, + "layer:delete": 2, // deletes not supported for now } if !reflect.DeepEqual(tl.ops, expectedOps) { @@ -68,7 +68,7 @@ func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Mani return nil } -func (tl *testListener) ManifestDeleted(repo reference.Named, m distribution.Manifest) error { +func (tl *testListener) ManifestDeleted(repo reference.Named, d digest.Digest) error { tl.ops["manifest:delete"]++ return nil } @@ -88,7 +88,7 @@ func (tl *testListener) BlobMounted(repo reference.Named, desc distribution.Desc return nil } -func (tl *testListener) BlobDeleted(repo reference.Named, desc distribution.Descriptor) error { +func (tl *testListener) BlobDeleted(repo reference.Named, d digest.Digest) error { tl.ops["layer:delete"]++ return nil } @@ -113,6 +113,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) { Tag: tag, } + var blobDigests []digest.Digest blobs := repository.Blobs(ctx) for i := 0; i < 2; i++ { rs, ds, err := testutil.CreateRandomTarFile() @@ -120,6 +121,7 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) { t.Fatalf("error creating test layer: %v", err) } dgst := digest.Digest(ds) + blobDigests = append(blobDigests, dgst) wr, err := blobs.Create(ctx) if err != nil { @@ -183,4 +185,16 @@ func checkExerciseRepository(t *testing.T, repository distribution.Repository) { t.Fatalf("unexpected error fetching manifest: %v", err) } + err = manifests.Delete(ctx, dgst) + if err != nil { + t.Fatalf("unexpected error deleting blob: %v", err) + } + + for _, d := range blobDigests { + err = blobs.Delete(ctx, d) + if err != nil { + t.Fatalf("unexpected error deleting blob: %v", err) + } + + } }