Merge pull request #1522 from RichardScothern/tag-events

Send tag events to notification listeners
This commit is contained in:
Richard Scothern 2016-03-23 16:41:29 -07:00
commit 87a997249d
10 changed files with 155 additions and 71 deletions

View file

@ -65,45 +65,64 @@ Events have a well-defined JSON structure and are sent as the body of
notification requests. One or more events are sent in a structure called an
envelope. Each event has a unique id that can be used to uniquely identify incoming
requests, if required. Along with that, an _action_ is provided with a
_target, identifying the object mutated during the event.
_target_, identifying the object mutated during the event.
The fields available in an `event` are described below.
Field | Type | Description
----- | ----- | -------------
id | string |ID provides a unique identifier for the event.
timestamp | Time | Timestamp is the time at which the event occurred.
action | string | Action indicates what action encompasses the provided event.
target | distribution.Descriptor | Target uniquely describes the target of the event.
length | int | Length in bytes of content. Same as Size field in Descriptor.
repository | string | Repository identifies the named repository.
fromRepository | string | FromRepository identifies the named repository which a blob was mounted from if appropriate.
url | string | URL provides a direct link to the content.
tag | string | Tag identifies a tag name in tag events
request | [RequestRecord](https://godoc.org/github.com/docker/distribution/notifications#RequestRecord) | Request covers the request that generated the event.
actor | [ActorRecord](https://godoc.org/github.com/docker/distribution/notifications#ActorRecord). | Actor specifies the agent that initiated the event. For most situations, this could be from the authorization context of the request.
source | [SourceRecord](https://godoc.org/github.com/docker/distribution/notifications#SourceRecord) | Source identifies the registry node that generated the event. Put differently, while the actor "initiates" the event, the source "generates" it.
The fields available in an event are described in detail in the
[godoc](http://godoc.org/github.com/docker/distribution/notifications#Event).
**TODO:** Let's break out the fields here rather than rely on the godoc.
The following is an example of a JSON event, sent in response to the push of a
manifest:
```json
{
"id": "asdf-asdf-asdf-asdf-0",
"timestamp": "2006-01-02T15:04:05Z",
"action": "push",
"events": [
{
"id": "320678d8-ca14-430f-8bb6-4ca139cd83f7",
"timestamp": "2016-03-09T14:44:26.402973972-08:00",
"action": "pull",
"target": {
"mediaType": "application/vnd.docker.distribution.manifest.v1+json",
"size": 1,
"digest": "sha256:0123456789abcdef0",
"length": 1,
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"size": 708,
"digest": "sha256:fea8895f450959fa676bcc1df0611ea93823a735a01205fd8622846041d0c7cf",
"length": 708,
"repository": "hello-world",
"url": "http://192.168.100.227:5000/v2/hello-world/manifests/sha256:fea8895f450959fa676bcc1df0611ea93823a735a01205fd8622846041d0c7cf",
"tag": "latest"
},
"request": {
"id": "asdfasdf",
"addr": "client.local",
"host": "registrycluster.local",
"method": "PUT",
"useragent": "test/0.1"
},
"actor": {
"name": "test-actor"
"id": "6df24a34-0959-4923-81ca-14f09767db19",
"addr": "192.168.64.11:42961",
"host": "192.168.100.227:5000",
"method": "GET",
"useragent": "curl/7.38.0"
},
"actor": {},
"source": {
"addr": "hostname.local:port"
"addr": "xtal.local:5000",
"instanceID": "a53db899-3b4b-4a62-a067-8dd013beaca4"
}
}
]
}
```
The target struct of events which are sent when manifests and blobs are deleted
will contain a subset of the data contained in Get and Put events. Specifically,
only the digest and repository will be sent.
@ -156,9 +175,9 @@ Content-Type: application/vnd.docker.distribution.events.v1+json
"target": {
"mediaType": "application/vnd.docker.distribution.manifest.v1+json",
"length": 1,
"digest": "sha256:0123456789abcdef0",
"digest": "sha256:fea8895f450959fa676bcc1df0611ea93823a735a01205fd8622846041d0c7cf",
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
"url": "http://example.com/v2/library/test/manifests/sha256:c3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5"
},
"request": {
"id": "asdfasdf",
@ -181,9 +200,9 @@ Content-Type: application/vnd.docker.distribution.events.v1+json
"target": {
"mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
"length": 2,
"digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5",
"digest": "sha256:c3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5",
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
"url": "http://example.com/v2/library/test/blobs/sha256:c3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5"
},
"request": {
"id": "asdfasdf",
@ -206,9 +225,9 @@ Content-Type: application/vnd.docker.distribution.events.v1+json
"target": {
"mediaType": "application/vnd.docker.container.image.rootfs.diff+x-gtar",
"length": 3,
"digest": "sha256:3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d6",
"digest": "sha256:c3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5",
"repository": "library/test",
"url": "http://example.com/v2/library/test/manifests/latest"
"url": "http://example.com/v2/library/test/blobs/sha256:c3b3692957d439ac1928219a83fac91e7bf96c153725526874673ae1f2023f8d5"
},
"request": {
"id": "asdfasdf",

View file

@ -53,12 +53,34 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord {
}
}
func (b *bridge) ManifestPushed(repo reference.Named, sm distribution.Manifest) error {
return b.createManifestEventAndWrite(EventActionPush, repo, sm)
func (b *bridge) ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
manifestEvent, err := b.createManifestEvent(EventActionPush, repo, sm)
if err != nil {
return err
}
func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest) error {
return b.createManifestEventAndWrite(EventActionPull, repo, sm)
for _, option := range options {
if opt, ok := option.(distribution.WithTagOption); ok {
manifestEvent.Target.Tag = opt.Tag
break
}
}
return b.sink.Write(*manifestEvent)
}
func (b *bridge) ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error {
manifestEvent, err := b.createManifestEvent(EventActionPull, repo, sm)
if err != nil {
return err
}
for _, option := range options {
if opt, ok := option.(distribution.WithTagOption); ok {
manifestEvent.Target.Tag = opt.Tag
break
}
}
return b.sink.Write(*manifestEvent)
}
func (b *bridge) ManifestDeleted(repo reference.Named, dgst digest.Digest) error {

View file

@ -3,6 +3,7 @@ package notifications
import (
"testing"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/reference"
@ -61,6 +62,38 @@ func TestEventBridgeManifestPushed(t *testing.T) {
}
}
func TestEventBridgeManifestPushedWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPush, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
}
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPushed(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestPulledWithTag(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkCommonManifest(t, EventActionPull, events...)
if events[0].Target.Tag != "latest" {
t.Fatalf("missing or unexpected tag: %#v", events[0].Target)
}
return nil
}))
repoRef, _ := reference.ParseNamed(repo)
if err := l.ManifestPulled(repoRef, sm, distribution.WithTag(m.Tag)); err != nil {
t.Fatalf("unexpected error notifying manifest pull: %v", err)
}
}
func TestEventBridgeManifestDeleted(t *testing.T) {
l := createTestEnv(t, testSinkFn(func(events ...Event) error {
checkDeleted(t, EventActionDelete, events...)

View file

@ -68,6 +68,9 @@ type Event struct {
// URL provides a direct link to the content.
URL string `json:"url,omitempty"`
// Tag provides the tag
Tag string `json:"tag,omitempty"`
} `json:"target,omitempty"`
// Request covers the request that generated the event.

View file

@ -12,8 +12,8 @@ import (
// ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface {
ManifestPushed(repo reference.Named, sm distribution.Manifest) error
ManifestPulled(repo reference.Named, sm distribution.Manifest) error
ManifestPushed(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
ManifestPulled(repo reference.Named, sm distribution.Manifest, options ...distribution.ManifestServiceOption) error
ManifestDeleted(repo reference.Named, dgst digest.Digest) error
}
@ -81,7 +81,7 @@ func (msl *manifestServiceListener) Delete(ctx context.Context, dgst digest.Dige
func (msl *manifestServiceListener) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
sm, err := msl.ManifestService.Get(ctx, dgst)
if err == nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm); err != nil {
if err := msl.parent.listener.ManifestPulled(msl.parent.Repository.Named(), sm, options...); err != nil {
logrus.Errorf("error dispatching manifest pull to listener: %v", err)
}
}
@ -93,7 +93,7 @@ func (msl *manifestServiceListener) Put(ctx context.Context, sm distribution.Man
dgst, err := msl.ManifestService.Put(ctx, sm, options...)
if err == nil {
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm); err != nil {
if err := msl.parent.listener.ManifestPushed(msl.parent.Repository.Named(), sm, options...); err != nil {
logrus.Errorf("error dispatching manifest push to listener: %v", err)
}
}

View file

@ -44,7 +44,7 @@ func TestListener(t *testing.T) {
"manifest:delete": 1,
"layer:push": 2,
"layer:pull": 2,
"layer:delete": 2, // deletes not supported for now
"layer:delete": 2,
}
if !reflect.DeepEqual(tl.ops, expectedOps) {
@ -57,13 +57,13 @@ type testListener struct {
ops map[string]int
}
func (tl *testListener) ManifestPushed(repo reference.Named, m distribution.Manifest) error {
func (tl *testListener) ManifestPushed(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
tl.ops["manifest:push"]++
return nil
}
func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Manifest) error {
func (tl *testListener) ManifestPulled(repo reference.Named, m distribution.Manifest, options ...distribution.ManifestServiceOption) error {
tl.ops["manifest:pull"]++
return nil
}

View file

@ -58,6 +58,20 @@ type ManifestServiceOption interface {
Apply(ManifestService) error
}
// WithTag allows a tag to be passed into Put
func WithTag(tag string) ManifestServiceOption {
return WithTagOption{tag}
}
// WithTagOption holds a tag
type WithTagOption struct{ Tag string }
// Apply conforms to the ManifestServiceOption interface
func (o WithTagOption) Apply(m ManifestService) error {
// no implementation
return nil
}
// Repository is a named collection of manifests and layers.
type Repository interface {
// Named returns the name of the repository.

View file

@ -402,9 +402,9 @@ func (ms *manifests) Get(ctx context.Context, dgst digest.Digest, options ...dis
)
for _, option := range options {
if opt, ok := option.(withTagOption); ok {
digestOrTag = opt.tag
ref, err = reference.WithTag(ms.name, opt.tag)
if opt, ok := option.(distribution.WithTagOption); ok {
digestOrTag = opt.Tag
ref, err = reference.WithTag(ms.name, opt.Tag)
if err != nil {
return nil, err
}
@ -465,21 +465,6 @@ func (ms *manifests) Get(ctx context.Context, dgst digest.Digest, options ...dis
return nil, HandleErrorResponse(resp)
}
// WithTag allows a tag to be passed into Put which enables the client
// to build a correct URL.
func WithTag(tag string) distribution.ManifestServiceOption {
return withTagOption{tag}
}
type withTagOption struct{ tag string }
func (o withTagOption) Apply(m distribution.ManifestService) error {
if _, ok := m.(*manifests); ok {
return nil
}
return fmt.Errorf("withTagOption is a client-only option")
}
// Put puts a manifest. A tag can be specified using an options parameter which uses some shared state to hold the
// tag name in order to build the correct upload URL.
func (ms *manifests) Put(ctx context.Context, m distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
@ -487,9 +472,9 @@ func (ms *manifests) Put(ctx context.Context, m distribution.Manifest, options .
var tagged bool
for _, option := range options {
if opt, ok := option.(withTagOption); ok {
if opt, ok := option.(distribution.WithTagOption); ok {
var err error
ref, err = reference.WithTag(ref, opt.tag)
ref, err = reference.WithTag(ref, opt.Tag)
if err != nil {
return "", err
}

View file

@ -710,7 +710,7 @@ func TestV1ManifestFetch(t *testing.T) {
t.Fatal(err)
}
manifest, err = ms.Get(ctx, dgst, WithTag("latest"))
manifest, err = ms.Get(ctx, dgst, distribution.WithTag("latest"))
if err != nil {
t.Fatal(err)
}
@ -723,7 +723,7 @@ func TestV1ManifestFetch(t *testing.T) {
t.Fatal(err)
}
manifest, err = ms.Get(ctx, dgst, WithTag("badcontenttype"))
manifest, err = ms.Get(ctx, dgst, distribution.WithTag("badcontenttype"))
if err != nil {
t.Fatal(err)
}
@ -761,7 +761,7 @@ func TestManifestFetchWithEtag(t *testing.T) {
if !ok {
panic("wrong type for client manifest service")
}
_, err = clientManifestService.Get(ctx, d1, WithTag("latest"), AddEtagToTag("latest", d1.String()))
_, err = clientManifestService.Get(ctx, d1, distribution.WithTag("latest"), AddEtagToTag("latest", d1.String()))
if err != distribution.ErrManifestNotModified {
t.Fatal(err)
}
@ -861,7 +861,7 @@ func TestManifestPut(t *testing.T) {
t.Fatal(err)
}
if _, err := ms.Put(ctx, m1, WithTag(m1.Tag)); err != nil {
if _, err := ms.Put(ctx, m1, distribution.WithTag(m1.Tag)); err != nil {
t.Fatal(err)
}

View file

@ -86,7 +86,11 @@ func (imh *imageManifestHandler) GetImageManifest(w http.ResponseWriter, r *http
return
}
manifest, err = manifests.Get(imh, imh.Digest)
var options []distribution.ManifestServiceOption
if imh.Tag != "" {
options = append(options, distribution.WithTag(imh.Tag))
}
manifest, err = manifests.Get(imh, imh.Digest, options...)
if err != nil {
imh.Errors = append(imh.Errors, v2.ErrorCodeManifestUnknown.WithDetail(err))
return
@ -245,7 +249,11 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
return
}
_, err = manifests.Put(imh, manifest)
var options []distribution.ManifestServiceOption
if imh.Tag != "" {
options = append(options, distribution.WithTag(imh.Tag))
}
_, err = manifests.Put(imh, manifest, options...)
if err != nil {
// TODO(stevvooe): These error handling switches really need to be
// handled by an app global mapper.