Merge pull request #569 from stevvooe/use-digest-manifest-notification

notifications: digest url should be sent in event target
This commit is contained in:
Stephen Day 2015-05-28 15:14:52 -07:00
commit b9eeb32808
4 changed files with 200 additions and 37 deletions

View file

@ -53,31 +53,31 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord {
} }
} }
func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestPushed(repo string, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPush, repo, sm) return b.createManifestEventAndWrite(EventActionPush, repo, sm)
} }
func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestPulled(repo string, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPull, repo, sm) return b.createManifestEventAndWrite(EventActionPull, repo, sm)
} }
func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestDeleted(repo string, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionDelete, repo, sm) return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
} }
func (b *bridge) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { func (b *bridge) BlobPushed(repo string, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionPush, repo, desc) return b.createBlobEventAndWrite(EventActionPush, repo, desc)
} }
func (b *bridge) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionPull, repo, desc) return b.createBlobEventAndWrite(EventActionPull, repo, desc)
} }
func (b *bridge) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionDelete, repo, desc) return b.createBlobEventAndWrite(EventActionDelete, repo, desc)
} }
func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error { func (b *bridge) createManifestEventAndWrite(action string, repo string, sm *manifest.SignedManifest) error {
manifestEvent, err := b.createManifestEvent(action, repo, sm) manifestEvent, err := b.createManifestEvent(action, repo, sm)
if err != nil { if err != nil {
return err return err
@ -86,10 +86,10 @@ func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Re
return b.sink.Write(*manifestEvent) return b.sink.Write(*manifestEvent)
} }
func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) { func (b *bridge) createManifestEvent(action string, repo string, sm *manifest.SignedManifest) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.MediaType = manifest.ManifestMediaType event.Target.MediaType = manifest.ManifestMediaType
event.Target.Repository = repo.Name() event.Target.Repository = repo
p, err := sm.Payload() p, err := sm.Payload()
if err != nil { if err != nil {
@ -97,15 +97,12 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository
} }
event.Target.Length = int64(len(p)) event.Target.Length = int64(len(p))
event.Target.Digest, err = digest.FromBytes(p) event.Target.Digest, err = digest.FromBytes(p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO(stevvooe): Currently, the is the "tag" url: once the digest url is event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, event.Target.Digest.String())
// implemented, this should be replaced.
event.Target.URL, err = b.ub.BuildManifestURL(sm.Name, sm.Tag)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,7 +110,7 @@ func (b *bridge) createManifestEvent(action string, repo distribution.Repository
return event, nil return event, nil
} }
func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Repository, desc distribution.Descriptor) error { func (b *bridge) createBlobEventAndWrite(action string, repo string, desc distribution.Descriptor) error {
event, err := b.createBlobEvent(action, repo, desc) event, err := b.createBlobEvent(action, repo, desc)
if err != nil { if err != nil {
return err return err
@ -122,13 +119,13 @@ func (b *bridge) createBlobEventAndWrite(action string, repo distribution.Reposi
return b.sink.Write(*event) return b.sink.Write(*event)
} }
func (b *bridge) createBlobEvent(action string, repo distribution.Repository, desc distribution.Descriptor) (*Event, error) { func (b *bridge) createBlobEvent(action string, repo string, desc distribution.Descriptor) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.Descriptor = desc event.Target.Descriptor = desc
event.Target.Repository = repo.Name() event.Target.Repository = repo
var err error var err error
event.Target.URL, err = b.ub.BuildBlobURL(repo.Name(), desc.Digest) event.Target.URL, err = b.ub.BuildBlobURL(repo, desc.Digest)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,166 @@
package notifications
import (
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/libtrust"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/uuid"
)
var (
// common environment for expected manifest events.
repo = "test/repo"
source = SourceRecord{
Addr: "remote.test",
InstanceID: uuid.Generate().String(),
}
ub = mustUB(v2.NewURLBuilderFromString("http://test.example.com/"))
actor = ActorRecord{
Name: "test",
}
request = RequestRecord{}
m = manifest.Manifest{
Name: repo,
Tag: "latest",
}
sm *manifest.SignedManifest
payload []byte
dgst digest.Digest
)
func TestEventBridgeManifestPulled(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
return nil
}))
if err := l.ManifestPulled(repo, sm); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestPushed(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
return nil
}))
if err := l.ManifestPushed(repo, sm); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionDelete, events...)
return nil
}))
if err := l.ManifestDeleted(repo, sm); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func createTestEnv(t *testing.T, fn testSinkFn) Listener {
pk, err := libtrust.GenerateECP256PrivateKey()
if err != nil {
t.Fatalf("error generating private key: %v", err)
}
sm, err = manifest.Sign(&m, pk)
if err != nil {
t.Fatalf("error signing manifest: %v", err)
}
payload, err = sm.Payload()
if err != nil {
t.Fatalf("error getting manifest payload: %v", err)
}
dgst, err = digest.FromBytes(payload)
if err != nil {
t.Fatalf("error digesting manifest payload: %v", err)
}
return NewBridge(ub, source, actor, request, fn)
}
func checkCommonManifest(t *testing.T, action string, events ...Event) {
checkCommon(t, events...)
event := events[0]
if event.Action != action {
t.Fatalf("unexpected event action: %q != %q", event.Action, action)
}
u, err := ub.BuildManifestURL(repo, dgst.String())
if err != nil {
t.Fatalf("error building expected url: %v", err)
}
if event.Target.URL != u {
t.Fatalf("incorrect url passed: %q != %q", event.Target.URL, u)
}
}
func checkCommon(t *testing.T, events ...Event) {
if len(events) != 1 {
t.Fatalf("unexpected number of events: %v != 1", len(events))
}
event := events[0]
if event.Source != source {
t.Fatalf("source not equal: %#v != %#v", event.Source, source)
}
if event.Request != request {
t.Fatalf("request not equal: %#v != %#v", event.Request, request)
}
if event.Actor != actor {
t.Fatalf("request not equal: %#v != %#v", event.Actor, actor)
}
if event.Target.Digest != dgst {
t.Fatalf("unexpected digest on event target: %q != %q", event.Target.Digest, dgst)
}
if event.Target.Length != int64(len(payload)) {
t.Fatalf("unexpected target length: %v != %v", event.Target.Length, len(payload))
}
if event.Target.Repository != repo {
t.Fatalf("unexpected repository: %q != %q", event.Target.Repository, repo)
}
}
type testSinkFn func(events ...Event) error
func (tsf testSinkFn) Write(events ...Event) error {
return tsf(events...)
}
func (tsf testSinkFn) Close() error { return nil }
func mustUB(ub *v2.URLBuilder, err error) *v2.URLBuilder {
if err != nil {
panic(err)
}
return ub
}

View file

@ -12,24 +12,24 @@ import (
// ManifestListener describes a set of methods for listening to events related to manifests. // ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface { type ManifestListener interface {
ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error ManifestPushed(repo string, sm *manifest.SignedManifest) error
ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error ManifestPulled(repo string, sm *manifest.SignedManifest) error
// TODO(stevvooe): Please note that delete support is still a little shaky // TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future. // and we'll need to propagate these in the future.
ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error ManifestDeleted(repo string, sm *manifest.SignedManifest) error
} }
// BlobListener describes a listener that can respond to layer related events. // BlobListener describes a listener that can respond to layer related events.
type BlobListener interface { type BlobListener interface {
BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error BlobPushed(repo string, desc distribution.Descriptor) error
BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error BlobPulled(repo string, desc distribution.Descriptor) error
// TODO(stevvooe): Please note that delete support is still a little shaky // TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future. // and we'll need to propagate these in the future.
BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error BlobDeleted(repo string, desc distribution.Descriptor) error
} }
// Listener combines all repository events into a single interface. // Listener combines all repository events into a single interface.
@ -73,7 +73,7 @@ type manifestServiceListener struct {
func (msl *manifestServiceListener) Get(dgst digest.Digest) (*manifest.SignedManifest, error) { func (msl *manifestServiceListener) Get(dgst digest.Digest) (*manifest.SignedManifest, error) {
sm, err := msl.ManifestService.Get(dgst) sm, err := msl.ManifestService.Get(dgst)
if err == nil { if err == nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil {
logrus.Errorf("error dispatching manifest pull to listener: %v", err) logrus.Errorf("error dispatching manifest pull to listener: %v", err)
} }
} }
@ -85,7 +85,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error {
err := msl.ManifestService.Put(sm) err := msl.ManifestService.Put(sm)
if err == nil { if err == nil {
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository, sm); err != nil { if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Name(), sm); err != nil {
logrus.Errorf("error dispatching manifest push to listener: %v", err) logrus.Errorf("error dispatching manifest push to listener: %v", err)
} }
} }
@ -96,7 +96,7 @@ func (msl *manifestServiceListener) Put(sm *manifest.SignedManifest) error {
func (msl *manifestServiceListener) GetByTag(tag string) (*manifest.SignedManifest, error) { func (msl *manifestServiceListener) GetByTag(tag string) (*manifest.SignedManifest, error) {
sm, err := msl.ManifestService.GetByTag(tag) sm, err := msl.ManifestService.GetByTag(tag)
if err == nil { if err == nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository, sm); err != nil { if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Name(), sm); err != nil {
logrus.Errorf("error dispatching manifest pull to listener: %v", err) logrus.Errorf("error dispatching manifest pull to listener: %v", err)
} }
} }
@ -117,7 +117,7 @@ func (bsl *blobServiceListener) Get(ctx context.Context, dgst digest.Digest) ([]
if desc, err := bsl.Stat(ctx, dgst); err != nil { if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else { } else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
} }
} }
@ -132,7 +132,7 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (d
if desc, err := bsl.Stat(ctx, dgst); err != nil { if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else { } else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
} }
} }
@ -147,7 +147,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr
if desc, err := bsl.Stat(ctx, dgst); err != nil { if desc, err := bsl.Stat(ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) context.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
} else { } else {
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository, desc); err != nil { if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Name(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
} }
} }
@ -159,7 +159,7 @@ func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWr
func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
desc, err := bsl.BlobStore.Put(ctx, mediaType, p) desc, err := bsl.BlobStore.Put(ctx, mediaType, p)
if err == nil { if err == nil {
if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository, desc); err != nil { if err := bsl.parent.listener.BlobPushed(bsl.parent.Repository.Name(), desc); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err) context.GetLogger(ctx).Errorf("error dispatching layer pull to listener: %v", err)
} }
} }
@ -192,7 +192,7 @@ type blobWriterListener struct {
func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { func (bwl *blobWriterListener) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
committed, err := bwl.BlobWriter.Commit(ctx, desc) committed, err := bwl.BlobWriter.Commit(ctx, desc)
if err == nil { if err == nil {
if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository, committed); err != nil { if err := bwl.parent.parent.listener.BlobPushed(bwl.parent.parent.Repository.Name(), committed); err != nil {
context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err) context.GetLogger(ctx).Errorf("error dispatching blob push to listener: %v", err)
} }
} }

View file

@ -51,33 +51,33 @@ type testListener struct {
ops map[string]int ops map[string]int
} }
func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestPushed(repo string, sm *manifest.SignedManifest) error {
tl.ops["manifest:push"]++ tl.ops["manifest:push"]++
return nil return nil
} }
func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestPulled(repo string, sm *manifest.SignedManifest) error {
tl.ops["manifest:pull"]++ tl.ops["manifest:pull"]++
return nil return nil
} }
func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestDeleted(repo string, sm *manifest.SignedManifest) error {
tl.ops["manifest:delete"]++ tl.ops["manifest:delete"]++
return nil return nil
} }
func (tl *testListener) BlobPushed(repo distribution.Repository, desc distribution.Descriptor) error { func (tl *testListener) BlobPushed(repo string, desc distribution.Descriptor) error {
tl.ops["layer:push"]++ tl.ops["layer:push"]++
return nil return nil
} }
func (tl *testListener) BlobPulled(repo distribution.Repository, desc distribution.Descriptor) error { func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) error {
tl.ops["layer:pull"]++ tl.ops["layer:pull"]++
return nil return nil
} }
func (tl *testListener) BlobDeleted(repo distribution.Repository, desc distribution.Descriptor) error { func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error {
tl.ops["layer:delete"]++ tl.ops["layer:delete"]++
return nil return nil
} }