Move layer interface definitions to distribution package

After consideration, it has been decided that the interfaces defined in the
storage package provide a good base for interacting with various registry
instances. Whether interacting with a remote API or a local, on-disk registry,
these types have proved flexible. By moving them here, they can become the
central components of interacting with distribution components.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-02-11 16:49:49 -08:00
parent c23ad2715f
commit 27b03f2136
20 changed files with 115 additions and 93 deletions

View file

@ -1,4 +1,4 @@
package storage package distribution
import ( import (
"fmt" "fmt"

View file

@ -1,4 +1,4 @@
package storage package distribution
import ( import (
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"

View file

@ -7,8 +7,10 @@ import (
"os" "os"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
"github.com/docker/distribution/configuration" "github.com/docker/distribution/configuration"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/notifications"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/auth" "github.com/docker/distribution/registry/auth"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage"
@ -32,7 +34,7 @@ type App struct {
router *mux.Router // main application router, configured with dispatchers router *mux.Router // main application router, configured with dispatchers
driver storagedriver.StorageDriver // driver maintains the app global storage driver instance. driver storagedriver.StorageDriver // driver maintains the app global storage driver instance.
registry storage.Registry // registry is the primary registry backend for the app instance. registry distribution.Registry // registry is the primary registry backend for the app instance.
accessController auth.AccessController // main access controller for application accessController auth.AccessController // main access controller for application
// events contains notification related configuration. // events contains notification related configuration.

View file

@ -4,10 +4,10 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -21,7 +21,7 @@ type Context struct {
// Repository is the repository for the current request. All requests // Repository is the repository for the current request. All requests
// should be scoped to a single repository. This field may be nil. // should be scoped to a single repository. This field may be nil.
Repository storage.Repository Repository distribution.Repository
// Errors is a collection of errors encountered during the request to be // Errors is a collection of errors encountered during the request to be
// returned to the client API. If errors are added to the collection, the // returned to the client API. If errors are added to the collection, the

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
@ -72,7 +73,7 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
case storage.ErrManifestVerification: case storage.ErrManifestVerification:
for _, verificationError := range err { for _, verificationError := range err {
switch verificationError := verificationError.(type) { switch verificationError := verificationError.(type) {
case storage.ErrUnknownLayer: case distribution.ErrUnknownLayer:
imh.Errors.Push(v2.ErrorCodeBlobUnknown, verificationError.FSLayer) imh.Errors.Push(v2.ErrorCodeBlobUnknown, verificationError.FSLayer)
case storage.ErrManifestUnverified: case storage.ErrManifestUnverified:
imh.Errors.Push(v2.ErrorCodeManifestUnverified) imh.Errors.Push(v2.ErrorCodeManifestUnverified)

View file

@ -3,10 +3,10 @@ package handlers
import ( import (
"net/http" "net/http"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
) )
@ -54,7 +54,7 @@ func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storage.ErrUnknownLayer: case distribution.ErrUnknownLayer:
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
lh.Errors.Push(v2.ErrorCodeBlobUnknown, err.FSLayer) lh.Errors.Push(v2.ErrorCodeBlobUnknown, err.FSLayer)
default: default:

View file

@ -7,10 +7,10 @@ import (
"net/url" "net/url"
"os" "os"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/api/v2"
"github.com/docker/distribution/registry/storage"
"github.com/gorilla/handlers" "github.com/gorilla/handlers"
) )
@ -63,7 +63,7 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
upload, err := layers.Resume(luh.UUID) upload, err := layers.Resume(luh.UUID)
if err != nil { if err != nil {
ctxu.GetLogger(ctx).Errorf("error resolving upload: %v", err) ctxu.GetLogger(ctx).Errorf("error resolving upload: %v", err)
if err == storage.ErrLayerUploadUnknown { if err == distribution.ErrLayerUploadUnknown {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown, err) luh.Errors.Push(v2.ErrorCodeBlobUploadUnknown, err)
@ -114,7 +114,7 @@ type layerUploadHandler struct {
// UUID identifies the upload instance for the current request. // UUID identifies the upload instance for the current request.
UUID string UUID string
Upload storage.LayerUpload Upload distribution.LayerUpload
State layerUploadState State layerUploadState
} }
@ -196,7 +196,7 @@ func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r *
layer, err := luh.Upload.Finish(dgst) layer, err := luh.Upload.Finish(dgst)
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storage.ErrLayerInvalidDigest: case distribution.ErrLayerInvalidDigest:
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeDigestInvalid, err) luh.Errors.Push(v2.ErrorCodeDigestInvalid, err)
default: default:

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/AdRoll/goamz/cloudfront" "github.com/AdRoll/goamz/cloudfront"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
@ -95,7 +96,7 @@ func newCloudFrontLayerHandler(storageDriver storagedriver.StorageDriver, option
// Resolve returns an http.Handler which can serve the contents of the given // Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver. // Layer, or an error if not supported by the storagedriver.
func (lh *cloudFrontLayerHandler) Resolve(layer Layer) (http.Handler, error) { func (lh *cloudFrontLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) {
layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil) layerURLStr, err := lh.delegateLayerHandler.urlFor(layer, nil)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -5,6 +5,7 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
@ -40,7 +41,7 @@ func newDelegateLayerHandler(storageDriver storagedriver.StorageDriver, options
// Resolve returns an http.Handler which can serve the contents of the given // Resolve returns an http.Handler which can serve the contents of the given
// Layer, or an error if not supported by the storagedriver. // Layer, or an error if not supported by the storagedriver.
func (lh *delegateLayerHandler) Resolve(layer Layer) (http.Handler, error) { func (lh *delegateLayerHandler) Resolve(layer distribution.Layer) (http.Handler, error) {
// TODO(bbland): This is just a sanity check to ensure that the // TODO(bbland): This is just a sanity check to ensure that the
// storagedriver supports url generation. It would be nice if we didn't have // storagedriver supports url generation. It would be nice if we didn't have
// to do this twice for non-GET requests. // to do this twice for non-GET requests.
@ -64,7 +65,7 @@ func (lh *delegateLayerHandler) Resolve(layer Layer) (http.Handler, error) {
// urlFor returns a download URL for the given layer, or the empty string if // urlFor returns a download URL for the given layer, or the empty string if
// unsupported. // unsupported.
func (lh *delegateLayerHandler) urlFor(layer Layer, options map[string]interface{}) (string, error) { func (lh *delegateLayerHandler) urlFor(layer distribution.Layer, options map[string]interface{}) (string, error) {
// Crack open the layer to get at the layerStore // Crack open the layer to get at the layerStore
layerRd, ok := layer.(*layerReader) layerRd, ok := layer.(*layerReader)
if !ok { if !ok {

View file

@ -125,23 +125,8 @@ func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
return fr.offset, err return fr.offset, err
} }
// Close the layer. Should be called when the resource is no longer needed.
func (fr *fileReader) Close() error { func (fr *fileReader) Close() error {
if fr.err != nil { return fr.closeWithErr(fmt.Errorf("fileReader: closed"))
return fr.err
}
fr.err = ErrLayerClosed
// close and release reader chain
if fr.rc != nil {
fr.rc.Close()
}
fr.rc = nil
fr.brd = nil
return fr.err
} }
// reader prepares the current reader at the lrs offset, ensuring its buffered // reader prepares the current reader at the lrs offset, ensuring its buffered
@ -199,3 +184,21 @@ func (fr *fileReader) reset() {
fr.rc = nil fr.rc = nil
} }
} }
func (fr *fileReader) closeWithErr(err error) error {
if fr.err != nil {
return fr.err
}
fr.err = err
// close and release reader chain
if fr.rc != nil {
fr.rc.Close()
}
fr.rc = nil
fr.brd = nil
return fr.err
}

View file

@ -9,6 +9,7 @@ import (
"os" "os"
"testing" "testing"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/registry/storage/driver/inmemory"
@ -53,7 +54,7 @@ func TestSimpleLayerUpload(t *testing.T) {
// Do a resume, get unknown upload // Do a resume, get unknown upload
layerUpload, err = ls.Resume(layerUpload.UUID()) layerUpload, err = ls.Resume(layerUpload.UUID())
if err != ErrLayerUploadUnknown { if err != distribution.ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
} }
@ -102,7 +103,7 @@ func TestSimpleLayerUpload(t *testing.T) {
} }
// After finishing an upload, it should no longer exist. // After finishing an upload, it should no longer exist.
if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown { if _, err := ls.Resume(layerUpload.UUID()); err != distribution.ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err) t.Fatalf("expected layer upload to be unknown, got %v", err)
} }
@ -165,7 +166,7 @@ func TestSimpleLayerRead(t *testing.T) {
} }
switch err.(type) { switch err.(type) {
case ErrUnknownLayer: case distribution.ErrUnknownLayer:
err = nil err = nil
default: default:
t.Fatalf("unexpected error fetching non-existent layer: %v", err) t.Fatalf("unexpected error fetching non-existent layer: %v", err)

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
@ -13,7 +14,7 @@ type LayerHandler interface {
// Layer if possible, or nil and an error when unsupported. This may // Layer if possible, or nil and an error when unsupported. This may
// directly serve the contents of the layer or issue a redirect to another // directly serve the contents of the layer or issue a redirect to another
// URL hosting the content. // URL hosting the content.
Resolve(layer Layer) (http.Handler, error) Resolve(layer distribution.Layer) (http.Handler, error)
} }
// LayerHandlerInitFunc is the type of a LayerHandler factory function and is // LayerHandlerInitFunc is the type of a LayerHandler factory function and is

View file

@ -3,6 +3,7 @@ package storage
import ( import (
"time" "time"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
) )
@ -15,7 +16,7 @@ type layerReader struct {
digest digest.Digest digest digest.Digest
} }
var _ Layer = &layerReader{} var _ distribution.Layer = &layerReader{}
func (lrs *layerReader) Name() string { func (lrs *layerReader) Name() string {
return lrs.name return lrs.name
@ -28,3 +29,8 @@ func (lrs *layerReader) Digest() digest.Digest {
func (lrs *layerReader) CreatedAt() time.Time { func (lrs *layerReader) CreatedAt() time.Time {
return lrs.modtime return lrs.modtime
} }
// Close the layer. Should be called when the resource is no longer needed.
func (lrs *layerReader) Close() error {
return lrs.closeWithErr(distribution.ErrLayerClosed)
}

View file

@ -4,6 +4,7 @@ import (
"time" "time"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
@ -23,7 +24,7 @@ func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case ErrUnknownLayer: case distribution.ErrUnknownLayer:
return false, nil return false, nil
} }
@ -33,7 +34,7 @@ func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
return true, nil return true, nil
} }
func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) { func (ls *layerStore) Fetch(dgst digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch") ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Fetch")
bp, err := ls.path(dgst) bp, err := ls.path(dgst)
if err != nil { if err != nil {
@ -55,7 +56,7 @@ func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) {
// Upload begins a layer upload, returning a handle. If the layer upload // Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this // is already in progress or the layer has already been uploaded, this
// will return an error. // will return an error.
func (ls *layerStore) Upload() (LayerUpload, error) { func (ls *layerStore) Upload() (distribution.LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload") ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Upload")
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of // NOTE(stevvooe): Consider the issues with allowing concurrent upload of
@ -93,7 +94,7 @@ func (ls *layerStore) Upload() (LayerUpload, error) {
// Resume continues an in progress layer upload, returning the current // Resume continues an in progress layer upload, returning the current
// state of the upload. // state of the upload.
func (ls *layerStore) Resume(uuid string) (LayerUpload, error) { func (ls *layerStore) Resume(uuid string) (distribution.LayerUpload, error) {
ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume") ctxu.GetLogger(ls.repository.ctx).Debug("(*layerStore).Resume")
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{ startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(), name: ls.repository.Name(),
@ -108,7 +109,7 @@ func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
return nil, ErrLayerUploadUnknown return nil, distribution.ErrLayerUploadUnknown
default: default:
return nil, err return nil, err
} }
@ -132,7 +133,7 @@ func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
} }
// newLayerUpload allocates a new upload controller with the given state. // newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (LayerUpload, error) { func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (distribution.LayerUpload, error) {
fw, err := newFileWriter(ls.repository.driver, path) fw, err := newFileWriter(ls.repository.driver, path)
if err != nil { if err != nil {
return nil, err return nil, err
@ -158,7 +159,9 @@ func (ls *layerStore) path(dgst digest.Digest) (string, error) {
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
return "", ErrUnknownLayer{manifest.FSLayer{BlobSum: dgst}} return "", distribution.ErrUnknownLayer{
FSLayer: manifest.FSLayer{BlobSum: dgst},
}
default: default:
return "", err return "", err
} }

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
@ -24,7 +25,7 @@ type layerUploadController struct {
fileWriter fileWriter
} }
var _ LayerUpload = &layerUploadController{} var _ distribution.LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked. // Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string { func (luc *layerUploadController) Name() string {
@ -44,7 +45,7 @@ func (luc *layerUploadController) StartedAt() time.Time {
// uploaded layer. The final size and checksum are validated against the // uploaded layer. The final size and checksum are validated against the
// contents of the uploaded layer. The checksum should be provided in the // contents of the uploaded layer. The checksum should be provided in the
// format <algorithm>:<hex digest>. // format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) { func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish")
canonical, err := luc.validateLayer(digest) canonical, err := luc.validateLayer(digest)
if err != nil { if err != nil {
@ -93,9 +94,9 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
case tarsum.Version1: case tarsum.Version1:
default: default:
// version 0 and dev, for now. // version 0 and dev, for now.
return "", ErrLayerInvalidDigest{ return "", distribution.ErrLayerInvalidDigest{
Digest: dgst, Digest: dgst,
Reason: ErrLayerTarSumVersionUnsupported, Reason: distribution.ErrLayerTarSumVersionUnsupported,
} }
} }
@ -124,7 +125,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
} }
if !digestVerifier.Verified() { if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{ return "", distribution.ErrLayerInvalidDigest{
Digest: dgst, Digest: dgst,
Reason: fmt.Errorf("content does not match digest"), Reason: fmt.Errorf("content does not match digest"),
} }

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/docker/distribution"
ctxu "github.com/docker/distribution/context" ctxu "github.com/docker/distribution/context"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
@ -71,7 +72,7 @@ type manifestStore struct {
tagStore *tagStore tagStore *tagStore
} }
var _ ManifestService = &manifestStore{} var _ distribution.ManifestService = &manifestStore{}
// func (ms *manifestStore) Repository() Repository { // func (ms *manifestStore) Repository() Repository {
// return ms.repository // return ms.repository
@ -177,7 +178,7 @@ func (ms *manifestStore) verifyManifest(tag string, mnfst *manifest.SignedManife
} }
if !exists { if !exists {
errs = append(errs, ErrUnknownLayer{FSLayer: fsLayer}) errs = append(errs, distribution.ErrUnknownLayer{FSLayer: fsLayer})
} }
} }

View file

@ -4,11 +4,10 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/docker/distribution/manifest"
"code.google.com/p/go-uuid/uuid" "code.google.com/p/go-uuid/uuid"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/manifest"
) )
type bridge struct { type bridge struct {
@ -53,31 +52,31 @@ func NewRequestRecord(id string, r *http.Request) RequestRecord {
} }
} }
func (b *bridge) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPush, repo, sm) return b.createManifestEventAndWrite(EventActionPush, repo, sm)
} }
func (b *bridge) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionPull, repo, sm) return b.createManifestEventAndWrite(EventActionPull, repo, sm)
} }
func (b *bridge) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { func (b *bridge) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error {
return b.createManifestEventAndWrite(EventActionDelete, repo, sm) return b.createManifestEventAndWrite(EventActionDelete, repo, sm)
} }
func (b *bridge) LayerPushed(repo storage.Repository, layer storage.Layer) error { func (b *bridge) LayerPushed(repo distribution.Repository, layer distribution.Layer) error {
return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest()) return b.createLayerEventAndWrite(EventActionPush, repo, layer.Digest())
} }
func (b *bridge) LayerPulled(repo storage.Repository, layer storage.Layer) error { func (b *bridge) LayerPulled(repo distribution.Repository, layer distribution.Layer) error {
return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest()) return b.createLayerEventAndWrite(EventActionPull, repo, layer.Digest())
} }
func (b *bridge) LayerDeleted(repo storage.Repository, layer storage.Layer) error { func (b *bridge) LayerDeleted(repo distribution.Repository, layer distribution.Layer) error {
return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest()) return b.createLayerEventAndWrite(EventActionDelete, repo, layer.Digest())
} }
func (b *bridge) createManifestEventAndWrite(action string, repo storage.Repository, sm *manifest.SignedManifest) error { func (b *bridge) createManifestEventAndWrite(action string, repo distribution.Repository, sm *manifest.SignedManifest) error {
event, err := b.createManifestEvent(action, repo, sm) event, err := b.createManifestEvent(action, repo, sm)
if err != nil { if err != nil {
return err return err
@ -86,7 +85,7 @@ func (b *bridge) createManifestEventAndWrite(action string, repo storage.Reposit
return b.sink.Write(*event) return b.sink.Write(*event)
} }
func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm *manifest.SignedManifest) (*Event, error) { func (b *bridge) createManifestEvent(action string, repo distribution.Repository, sm *manifest.SignedManifest) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.Type = EventTargetTypeManifest event.Target.Type = EventTargetTypeManifest
event.Target.Name = repo.Name() event.Target.Name = repo.Name()
@ -112,7 +111,7 @@ func (b *bridge) createManifestEvent(action string, repo storage.Repository, sm
return event, nil return event, nil
} }
func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository, dgst digest.Digest) error { func (b *bridge) createLayerEventAndWrite(action string, repo distribution.Repository, dgst digest.Digest) error {
event, err := b.createLayerEvent(action, repo, dgst) event, err := b.createLayerEvent(action, repo, dgst)
if err != nil { if err != nil {
return err return err
@ -121,7 +120,7 @@ func (b *bridge) createLayerEventAndWrite(action string, repo storage.Repository
return b.sink.Write(*event) return b.sink.Write(*event)
} }
func (b *bridge) createLayerEvent(action string, repo storage.Repository, dgst digest.Digest) (*Event, error) { func (b *bridge) createLayerEvent(action string, repo distribution.Repository, dgst digest.Digest) (*Event, error) {
event := b.createEvent(action) event := b.createEvent(action)
event.Target.Type = EventTargetTypeBlob event.Target.Type = EventTargetTypeBlob
event.Target.Name = repo.Name() event.Target.Name = repo.Name()

View file

@ -2,31 +2,31 @@ package notifications
import ( import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/storage"
) )
// ManifestListener describes a set of methods for listening to events related to manifests. // ManifestListener describes a set of methods for listening to events related to manifests.
type ManifestListener interface { type ManifestListener interface {
ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error
ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error
// TODO(stevvooe): Please note that delete support is still a little shaky // TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future. // and we'll need to propagate these in the future.
ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error
} }
// LayerListener describes a listener that can respond to layer related events. // LayerListener describes a listener that can respond to layer related events.
type LayerListener interface { type LayerListener interface {
LayerPushed(repo storage.Repository, layer storage.Layer) error LayerPushed(repo distribution.Repository, layer distribution.Layer) error
LayerPulled(repo storage.Repository, layer storage.Layer) error LayerPulled(repo distribution.Repository, layer distribution.Layer) error
// TODO(stevvooe): Please note that delete support is still a little shaky // TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future. // and we'll need to propagate these in the future.
LayerDeleted(repo storage.Repository, layer storage.Layer) error LayerDeleted(repo distribution.Repository, layer distribution.Layer) error
} }
// Listener combines all repository events into a single interface. // Listener combines all repository events into a single interface.
@ -36,26 +36,26 @@ type Listener interface {
} }
type repositoryListener struct { type repositoryListener struct {
storage.Repository distribution.Repository
listener Listener listener Listener
} }
// Listen dispatches events on the repository to the listener. // Listen dispatches events on the repository to the listener.
func Listen(repo storage.Repository, listener Listener) storage.Repository { func Listen(repo distribution.Repository, listener Listener) distribution.Repository {
return &repositoryListener{ return &repositoryListener{
Repository: repo, Repository: repo,
listener: listener, listener: listener,
} }
} }
func (rl *repositoryListener) Manifests() storage.ManifestService { func (rl *repositoryListener) Manifests() distribution.ManifestService {
return &manifestServiceListener{ return &manifestServiceListener{
ManifestService: rl.Repository.Manifests(), ManifestService: rl.Repository.Manifests(),
parent: rl, parent: rl,
} }
} }
func (rl *repositoryListener) Layers() storage.LayerService { func (rl *repositoryListener) Layers() distribution.LayerService {
return &layerServiceListener{ return &layerServiceListener{
LayerService: rl.Repository.Layers(), LayerService: rl.Repository.Layers(),
parent: rl, parent: rl,
@ -63,7 +63,7 @@ func (rl *repositoryListener) Layers() storage.LayerService {
} }
type manifestServiceListener struct { type manifestServiceListener struct {
storage.ManifestService distribution.ManifestService
parent *repositoryListener parent *repositoryListener
} }
@ -91,11 +91,11 @@ func (msl *manifestServiceListener) Put(tag string, sm *manifest.SignedManifest)
} }
type layerServiceListener struct { type layerServiceListener struct {
storage.LayerService distribution.LayerService
parent *repositoryListener parent *repositoryListener
} }
func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error) { func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (distribution.Layer, error) {
layer, err := lsl.LayerService.Fetch(dgst) layer, err := lsl.LayerService.Fetch(dgst)
if err == nil { if err == nil {
if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil { if err := lsl.parent.listener.LayerPulled(lsl.parent.Repository, layer); err != nil {
@ -106,17 +106,17 @@ func (lsl *layerServiceListener) Fetch(dgst digest.Digest) (storage.Layer, error
return layer, err return layer, err
} }
func (lsl *layerServiceListener) Upload() (storage.LayerUpload, error) { func (lsl *layerServiceListener) Upload() (distribution.LayerUpload, error) {
lu, err := lsl.LayerService.Upload() lu, err := lsl.LayerService.Upload()
return lsl.decorateUpload(lu), err return lsl.decorateUpload(lu), err
} }
func (lsl *layerServiceListener) Resume(uuid string) (storage.LayerUpload, error) { func (lsl *layerServiceListener) Resume(uuid string) (distribution.LayerUpload, error) {
lu, err := lsl.LayerService.Resume(uuid) lu, err := lsl.LayerService.Resume(uuid)
return lsl.decorateUpload(lu), err return lsl.decorateUpload(lu), err
} }
func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.LayerUpload { func (lsl *layerServiceListener) decorateUpload(lu distribution.LayerUpload) distribution.LayerUpload {
return &layerUploadListener{ return &layerUploadListener{
LayerUpload: lu, LayerUpload: lu,
parent: lsl, parent: lsl,
@ -124,11 +124,11 @@ func (lsl *layerServiceListener) decorateUpload(lu storage.LayerUpload) storage.
} }
type layerUploadListener struct { type layerUploadListener struct {
storage.LayerUpload distribution.LayerUpload
parent *layerServiceListener parent *layerServiceListener
} }
func (lul *layerUploadListener) Finish(dgst digest.Digest) (storage.Layer, error) { func (lul *layerUploadListener) Finish(dgst digest.Digest) (distribution.Layer, error) {
layer, err := lul.LayerUpload.Finish(dgst) layer, err := lul.LayerUpload.Finish(dgst)
if err == nil { if err == nil {
if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil { if err := lul.parent.parent.listener.LayerPushed(lul.parent.parent.Repository, layer); err != nil {

View file

@ -5,6 +5,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/registry/storage" "github.com/docker/distribution/registry/storage"
@ -44,40 +45,40 @@ type testListener struct {
ops map[string]int ops map[string]int
} }
func (tl *testListener) ManifestPushed(repo storage.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestPushed(repo distribution.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:push"]++ tl.ops["manifest:push"]++
return nil return nil
} }
func (tl *testListener) ManifestPulled(repo storage.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestPulled(repo distribution.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:pull"]++ tl.ops["manifest:pull"]++
return nil return nil
} }
func (tl *testListener) ManifestDeleted(repo storage.Repository, sm *manifest.SignedManifest) error { func (tl *testListener) ManifestDeleted(repo distribution.Repository, sm *manifest.SignedManifest) error {
tl.ops["manifest:delete"]++ tl.ops["manifest:delete"]++
return nil return nil
} }
func (tl *testListener) LayerPushed(repo storage.Repository, layer storage.Layer) error { func (tl *testListener) LayerPushed(repo distribution.Repository, layer distribution.Layer) error {
tl.ops["layer:push"]++ tl.ops["layer:push"]++
return nil return nil
} }
func (tl *testListener) LayerPulled(repo storage.Repository, layer storage.Layer) error { func (tl *testListener) LayerPulled(repo distribution.Repository, layer distribution.Layer) error {
tl.ops["layer:pull"]++ tl.ops["layer:pull"]++
return nil return nil
} }
func (tl *testListener) LayerDeleted(repo storage.Repository, layer storage.Layer) error { func (tl *testListener) LayerDeleted(repo distribution.Repository, layer distribution.Layer) error {
tl.ops["layer:delete"]++ tl.ops["layer:delete"]++
return nil return nil
} }
// checkExerciseRegistry takes the registry through all of its operations, // checkExerciseRegistry takes the registry through all of its operations,
// carrying out generic checks. // carrying out generic checks.
func checkExerciseRepository(t *testing.T, repository storage.Repository) { func checkExerciseRepository(t *testing.T, repository distribution.Repository) {
// TODO(stevvooe): This would be a nice testutil function. Basically, it // TODO(stevvooe): This would be a nice testutil function. Basically, it
// takes the registry through a common set of operations. This could be // takes the registry through a common set of operations. This could be
// used to make cross-cutting updates by changing internals that affect // used to make cross-cutting updates by changing internals that affect

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"github.com/docker/distribution"
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -16,7 +17,7 @@ type registry struct {
// NewRegistryWithDriver creates a new registry instance from the provided // NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is // driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate. // cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry { func NewRegistryWithDriver(driver storagedriver.StorageDriver) distribution.Registry {
bs := &blobStore{} bs := &blobStore{}
reg := &registry{ reg := &registry{
@ -35,7 +36,7 @@ func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry {
// Repository returns an instance of the repository tied to the registry. // Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to // Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped. // allocate. In general, they should be request scoped.
func (reg *registry) Repository(ctx context.Context, name string) Repository { func (reg *registry) Repository(ctx context.Context, name string) distribution.Repository {
return &repository{ return &repository{
ctx: ctx, ctx: ctx,
registry: reg, registry: reg,
@ -58,7 +59,7 @@ func (repo *repository) Name() string {
// Manifests returns an instance of ManifestService. Instantiation is cheap and // Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar // may be context sensitive in the future. The instance should be used similar
// to a request local. // to a request local.
func (repo *repository) Manifests() ManifestService { func (repo *repository) Manifests() distribution.ManifestService {
return &manifestStore{ return &manifestStore{
repository: repo, repository: repo,
revisionStore: &revisionStore{ revisionStore: &revisionStore{
@ -73,7 +74,7 @@ func (repo *repository) Manifests() ManifestService {
// Layers returns an instance of the LayerService. Instantiation is cheap and // Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar // may be context sensitive in the future. The instance should be used similar
// to a request local. // to a request local.
func (repo *repository) Layers() LayerService { func (repo *repository) Layers() distribution.LayerService {
return &layerStore{ return &layerStore{
repository: repo, repository: repo,
} }