Merge pull request #74 from stevvooe/refactor-storage-interfaces

Refactor storage API to be registry oriented
This commit is contained in:
Olivier Gambier 2015-01-20 10:44:56 -08:00
commit 8ae6179711
17 changed files with 343 additions and 337 deletions

View file

@ -26,8 +26,8 @@ type App struct {
// driver maintains the app global storage driver instance.
driver storagedriver.StorageDriver
// services contains the main services instance for the application.
services *storage.Services
// registry is the primary registry backend for the app instance.
registry storage.Registry
layerHandler storage.LayerHandler
@ -63,7 +63,7 @@ func NewApp(configuration configuration.Configuration) *App {
}
app.driver = driver
app.services = storage.NewServices(app.driver)
app.registry = storage.NewRegistryWithDriver(app.driver)
authType := configuration.Auth.Type()
if authType != "" {
@ -136,11 +136,11 @@ func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
context := app.context(r)
if err := app.authorized(w, r, context); err != nil {
if err := app.authorized(w, r, context, context.vars["name"]); err != nil {
return
}
context.log = log.WithField("name", context.Name)
context.log = log.WithField("name", context.Repository.Name())
handler := dispatch(context, r)
ssrw := &singleStatusResponseWriter{ResponseWriter: w}
@ -165,7 +165,6 @@ func (app *App) context(r *http.Request) *Context {
vars := mux.Vars(r)
context := &Context{
App: app,
Name: vars["name"],
urlBuilder: v2.NewURLBuilderFromRequest(r),
}
@ -175,19 +174,23 @@ func (app *App) context(r *http.Request) *Context {
return context
}
// authorized checks if the request can proceed with with request access-
// level. If it cannot, the method will return an error.
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
// authorized checks if the request can proceed with access to the requested
// repository. If it succeeds, the repository will be available on the
// context. An error will be if access is not available.
func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context, repo string) error {
if app.accessController == nil {
// No access controller, so we simply provide access.
context.Repository = app.registry.Repository(repo)
return nil // access controller is not enabled.
}
var accessRecords []auth.Access
if context.Name != "" {
if repo != "" {
resource := auth.Resource{
Type: "repository",
Name: context.Name,
Name: repo,
}
switch r.Method {
@ -256,6 +259,10 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
return err
}
// At this point, the request should have access to the repository under
// the requested operation. Make is available on the context.
context.Repository = app.registry.Repository(repo)
return nil
}

View file

@ -10,6 +10,8 @@ import (
"github.com/docker/distribution/api/v2"
_ "github.com/docker/distribution/auth/silly"
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/storage"
"github.com/docker/distribution/storagedriver/inmemory"
)
// TestAppDispatcher builds an application with a test dispatcher and ensures
@ -17,9 +19,12 @@ import (
// This only tests the dispatch mechanism. The underlying dispatchers must be
// tested individually.
func TestAppDispatcher(t *testing.T) {
driver := inmemory.New()
app := &App{
Config: configuration.Configuration{},
router: v2.Router(),
driver: driver,
registry: storage.NewRegistryWithDriver(driver),
}
server := httptest.NewServer(app)
router := v2.Router()
@ -32,8 +37,8 @@ func TestAppDispatcher(t *testing.T) {
varCheckingDispatcher := func(expectedVars map[string]string) dispatchFunc {
return func(ctx *Context, r *http.Request) http.Handler {
// Always checks the same name context
if ctx.Name != ctx.vars["name"] {
t.Fatalf("unexpected name: %q != %q", ctx.Name, "foo/bar")
if ctx.Repository.Name() != ctx.vars["name"] {
t.Fatalf("unexpected name: %q != %q", ctx.Repository.Name(), "foo/bar")
}
// Check that we have all that is expected

View file

@ -3,6 +3,7 @@ package registry
import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/api/v2"
"github.com/docker/distribution/storage"
)
// Context should contain the request specific context for use in across
@ -12,9 +13,9 @@ type Context struct {
// App points to the application structure that created this context.
*App
// Name is the prefix for the current request. Corresponds to the
// namespace/repository associated with the image.
Name string
// Repository is the repository for the current request. All requests
// should be scoped to a single repository. This field may be nil.
Repository storage.Repository
// Errors is a collection of errors encountered during the request to be
// returned to the client API. If errors are added to the collection, the

View file

@ -38,8 +38,8 @@ type imageManifestHandler struct {
// GetImageManifest fetches the image manifest from the storage backend, if it exists.
func (imh *imageManifestHandler) GetImageManifest(w http.ResponseWriter, r *http.Request) {
manifests := imh.services.Manifests()
manifest, err := manifests.Get(imh.Name, imh.Tag)
manifests := imh.Repository.Manifests()
manifest, err := manifests.Get(imh.Tag)
if err != nil {
imh.Errors.Push(v2.ErrorCodeManifestUnknown, err)
@ -54,7 +54,7 @@ func (imh *imageManifestHandler) GetImageManifest(w http.ResponseWriter, r *http
// PutImageManifest validates and stores and image in the registry.
func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http.Request) {
manifests := imh.services.Manifests()
manifests := imh.Repository.Manifests()
dec := json.NewDecoder(r.Body)
var manifest manifest.SignedManifest
@ -64,7 +64,7 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
return
}
if err := manifests.Put(imh.Name, imh.Tag, &manifest); err != nil {
if err := manifests.Put(imh.Tag, &manifest); err != nil {
// TODO(stevvooe): These error handling switches really need to be
// handled by an app global mapper.
switch err := err.(type) {
@ -96,8 +96,8 @@ func (imh *imageManifestHandler) PutImageManifest(w http.ResponseWriter, r *http
// DeleteImageManifest removes the image with the given tag from the registry.
func (imh *imageManifestHandler) DeleteImageManifest(w http.ResponseWriter, r *http.Request) {
manifests := imh.services.Manifests()
if err := manifests.Delete(imh.Name, imh.Tag); err != nil {
manifests := imh.Repository.Manifests()
if err := manifests.Delete(imh.Tag); err != nil {
switch err := err.(type) {
case storage.ErrUnknownManifest:
imh.Errors.Push(v2.ErrorCodeManifestUnknown, err)

View file

@ -42,9 +42,8 @@ type layerHandler struct {
// GetLayer fetches the binary data from backend storage returns it in the
// response.
func (lh *layerHandler) GetLayer(w http.ResponseWriter, r *http.Request) {
layers := lh.services.Layers()
layer, err := layers.Fetch(lh.Name, lh.Digest)
layers := lh.Repository.Layers()
layer, err := layers.Fetch(lh.Digest)
if err != nil {
switch err := err.(type) {

View file

@ -43,6 +43,14 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
}
luh.State = state
if state.Name != ctx.Repository.Name() {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.log.Infof("mismatched repository name in upload state: %q != %q", state.Name, luh.Repository.Name())
w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err)
})
}
if state.UUID != luh.UUID {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx.log.Infof("mismatched uuid in upload state: %q != %q", state.UUID, luh.UUID)
@ -51,8 +59,8 @@ func layerUploadDispatcher(ctx *Context, r *http.Request) http.Handler {
})
}
layers := ctx.services.Layers()
upload, err := layers.Resume(luh.Name, luh.UUID)
layers := ctx.Repository.Layers()
upload, err := layers.Resume(luh.UUID)
if err != nil {
ctx.log.Errorf("error resolving upload: %v", err)
if err == storage.ErrLayerUploadUnknown {
@ -114,8 +122,8 @@ type layerUploadHandler struct {
// StartLayerUpload begins the layer upload process and allocates a server-
// side upload session.
func (luh *layerUploadHandler) StartLayerUpload(w http.ResponseWriter, r *http.Request) {
layers := luh.services.Layers()
upload, err := layers.Upload(luh.Name)
layers := luh.Repository.Layers()
upload, err := layers.Upload()
if err != nil {
w.WriteHeader(http.StatusInternalServerError) // Error conditions here?
luh.Errors.Push(v2.ErrorCodeUnknown, err)
@ -222,7 +230,7 @@ func (luh *layerUploadHandler) layerUploadResponse(w http.ResponseWriter, r *htt
}
// TODO(stevvooe): Need a better way to manage the upload state automatically.
luh.State.Name = luh.Name
luh.State.Name = luh.Repository.Name()
luh.State.UUID = luh.Upload.UUID()
luh.State.Offset = offset
luh.State.StartedAt = luh.Upload.StartedAt()

View file

@ -33,14 +33,14 @@ type tagsAPIResponse struct {
// GetTags returns a json list of tags for a specific image name.
func (th *tagsHandler) GetTags(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
manifests := th.services.Manifests()
manifests := th.Repository.Manifests()
tags, err := manifests.Tags(th.Name)
tags, err := manifests.Tags()
if err != nil {
switch err := err.(type) {
case storage.ErrUnknownRepository:
w.WriteHeader(404)
th.Errors.Push(v2.ErrorCodeNameUnknown, map[string]string{"name": th.Name})
th.Errors.Push(v2.ErrorCodeNameUnknown, map[string]string{"name": th.Repository.Name()})
default:
th.Errors.PushErr(err)
}
@ -51,7 +51,7 @@ func (th *tagsHandler) GetTags(w http.ResponseWriter, r *http.Request) {
enc := json.NewEncoder(w)
if err := enc.Encode(tagsAPIResponse{
Name: th.Name,
Name: th.Repository.Name(),
Tags: tags,
}); err != nil {
th.Errors.PushErr(err)

View file

@ -18,8 +18,7 @@ import (
// abstraction, providing utility methods that support creating and traversing
// backend links.
type blobStore struct {
driver storagedriver.StorageDriver
pm *pathMapper
*registry
}
// exists reports whether or not the path exists. If the driver returns error

View file

@ -32,23 +32,13 @@ func TestSimpleLayerUpload(t *testing.T) {
imageName := "foo/bar"
driver := inmemory.New()
pm := &pathMapper{
root: "/storage/testing",
version: storagePathVersion,
}
ls := &layerStore{
driver: driver,
blobStore: &blobStore{
driver: driver,
pm: pm,
},
pathMapper: pm,
}
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(imageName).Layers()
h := sha256.New()
rd := io.TeeReader(randomDataReader, h)
layerUpload, err := ls.Upload(imageName)
layerUpload, err := ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
@ -60,13 +50,13 @@ func TestSimpleLayerUpload(t *testing.T) {
}
// Do a resume, get unknown upload
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
layerUpload, err = ls.Resume(layerUpload.UUID())
if err != ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
}
// Restart!
layerUpload, err = ls.Upload(imageName)
layerUpload, err = ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
@ -97,7 +87,7 @@ func TestSimpleLayerUpload(t *testing.T) {
layerUpload.Close()
// Do a resume, for good fun
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID())
layerUpload, err = ls.Resume(layerUpload.UUID())
if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err)
}
@ -110,12 +100,12 @@ func TestSimpleLayerUpload(t *testing.T) {
}
// After finishing an upload, it should no longer exist.
if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown {
if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err)
}
// Test for existence.
exists, err := ls.Exists(layer.Name(), layer.Digest())
exists, err := ls.Exists(layer.Digest())
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
@ -145,18 +135,8 @@ func TestSimpleLayerUpload(t *testing.T) {
func TestSimpleLayerRead(t *testing.T) {
imageName := "foo/bar"
driver := inmemory.New()
pm := &pathMapper{
root: "/storage/testing",
version: storagePathVersion,
}
ls := &layerStore{
driver: driver,
blobStore: &blobStore{
driver: driver,
pm: pm,
},
pathMapper: pm,
}
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(imageName).Layers()
randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil {
@ -166,7 +146,7 @@ func TestSimpleLayerRead(t *testing.T) {
dgst := digest.Digest(tarSumStr)
// Test for existence.
exists, err := ls.Exists(imageName, dgst)
exists, err := ls.Exists(dgst)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err)
}
@ -176,7 +156,7 @@ func TestSimpleLayerRead(t *testing.T) {
}
// Try to get the layer and make sure we get a not found error
layer, err := ls.Fetch(imageName, dgst)
layer, err := ls.Fetch(dgst)
if err == nil {
t.Fatalf("error expected fetching unknown layer")
}
@ -188,7 +168,7 @@ func TestSimpleLayerRead(t *testing.T) {
t.Fatalf("unexpected error fetching non-existent layer: %v", err)
}
randomLayerDigest, err := writeTestLayer(driver, ls.pathMapper, imageName, dgst, randomLayerReader)
randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
if err != nil {
t.Fatalf("unexpected error writing test layer: %v", err)
}
@ -198,7 +178,7 @@ func TestSimpleLayerRead(t *testing.T) {
t.Fatalf("error getting seeker size for random layer: %v", err)
}
layer, err = ls.Fetch(imageName, dgst)
layer, err = ls.Fetch(dgst)
if err != nil {
t.Fatal(err)
}

View file

@ -10,15 +10,13 @@ import (
)
type layerStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
blobStore *blobStore
repository *repository
}
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
// Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch.
_, err := ls.Fetch(name, digest)
_, err := ls.Fetch(digest)
if err != nil {
switch err.(type) {
@ -32,20 +30,20 @@ func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
return true, nil
}
func (ls *layerStore) Fetch(name string, dgst digest.Digest) (Layer, error) {
bp, err := ls.path(name, dgst)
func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) {
bp, err := ls.path(dgst)
if err != nil {
return nil, err
}
fr, err := newFileReader(ls.driver, bp)
fr, err := newFileReader(ls.repository.driver, bp)
if err != nil {
return nil, err
}
return &layerReader{
fileReader: *fr,
name: name,
name: ls.repository.Name(),
digest: dgst,
}, nil
}
@ -53,7 +51,7 @@ func (ls *layerStore) Fetch(name string, dgst digest.Digest) (Layer, error) {
// Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this
// will return an error.
func (ls *layerStore) Upload(name string) (LayerUpload, error) {
func (ls *layerStore) Upload() (LayerUpload, error) {
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both
@ -62,8 +60,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
uuid := uuid.New()
startedAt := time.Now().UTC()
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
path, err := ls.repository.registry.pm.path(uploadDataPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -71,8 +69,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
return nil, err
}
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -81,18 +79,18 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
}
// Write a startedat file for this upload
if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
return ls.newLayerUpload(uuid, path, startedAt)
}
// Resume continues an in progress layer upload, returning the current
// state of the upload.
func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{
name: name,
func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -100,7 +98,7 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err
}
startedAtBytes, err := ls.driver.GetContent(startedAtPath)
startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath)
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
@ -115,8 +113,8 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err
}
path, err := ls.pathMapper.path(uploadDataPathSpec{
name: name,
path, err := ls.repository.pm.path(uploadDataPathSpec{
name: ls.repository.Name(),
uuid: uuid,
})
@ -124,33 +122,32 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err
}
return ls.newLayerUpload(name, uuid, path, startedAt)
return ls.newLayerUpload(uuid, path, startedAt)
}
// newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.driver, path)
func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.repository.driver, path)
if err != nil {
return nil, err
}
return &layerUploadController{
layerStore: ls,
name: name,
uuid: uuid,
startedAt: startedAt,
fileWriter: *fw,
}, nil
}
func (ls *layerStore) path(name string, dgst digest.Digest) (string, error) {
func (ls *layerStore) path(dgst digest.Digest) (string, error) {
// We must traverse this path through the link to enforce ownership.
layerLinkPath, err := ls.pathMapper.path(layerLinkPathSpec{name: name, digest: dgst})
layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst})
if err != nil {
return "", err
}
blobPath, err := ls.blobStore.resolve(layerLinkPath)
blobPath, err := ls.repository.blobStore.resolve(layerLinkPath)
if err != nil {
switch err := err.(type) {

View file

@ -16,7 +16,6 @@ import (
type layerUploadController struct {
layerStore *layerStore
name string
uuid string
startedAt time.Time
@ -27,7 +26,7 @@ var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string {
return luc.name
return luc.layerStore.repository.Name()
}
// UUID returns the identifier for this upload.
@ -63,7 +62,7 @@ func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
return nil, err
}
return luc.layerStore.Fetch(luc.Name(), canonical)
return luc.layerStore.Fetch(canonical)
}
// Cancel the layer upload process.
@ -128,7 +127,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// identified by dgst. The layer should be validated before commencing the
// move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.pathMapper.path(blobDataPathSpec{
blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{
digest: dgst,
})
@ -137,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
}
// Check for existence
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil {
if _, err := luc.layerStore.repository.registry.driver.Stat(blobPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist.
@ -158,7 +157,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
// linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller.
func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{
layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{
name: luc.Name(),
digest: digest,
})
@ -167,15 +166,15 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
return err
}
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest))
return luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(digest))
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error {
dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{
name: luc.name,
dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{
name: luc.Name(),
uuid: luc.uuid,
})

View file

@ -6,7 +6,6 @@ import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/libtrust"
)
@ -65,65 +64,67 @@ func (errs ErrManifestVerification) Error() string {
}
type manifestStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
repository *repository
revisionStore *revisionStore
tagStore *tagStore
blobStore *blobStore
layerService LayerService
}
var _ ManifestService = &manifestStore{}
func (ms *manifestStore) Tags(name string) ([]string, error) {
return ms.tagStore.tags(name)
// func (ms *manifestStore) Repository() Repository {
// return ms.repository
// }
func (ms *manifestStore) Tags() ([]string, error) {
return ms.tagStore.tags()
}
func (ms *manifestStore) Exists(name, tag string) (bool, error) {
return ms.tagStore.exists(name, tag)
func (ms *manifestStore) Exists(tag string) (bool, error) {
return ms.tagStore.exists(tag)
}
func (ms *manifestStore) Get(name, tag string) (*manifest.SignedManifest, error) {
dgst, err := ms.tagStore.resolve(name, tag)
func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) {
dgst, err := ms.tagStore.resolve(tag)
if err != nil {
return nil, err
}
return ms.revisionStore.get(name, dgst)
return ms.revisionStore.get(dgst)
}
func (ms *manifestStore) Put(name, tag string, manifest *manifest.SignedManifest) error {
func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error {
// Verify the manifest.
if err := ms.verifyManifest(name, tag, manifest); err != nil {
if err := ms.verifyManifest(tag, manifest); err != nil {
return err
}
// Store the revision of the manifest
revision, err := ms.revisionStore.put(name, manifest)
revision, err := ms.revisionStore.put(manifest)
if err != nil {
return err
}
// Now, tag the manifest
return ms.tagStore.tag(name, tag, revision)
return ms.tagStore.tag(tag, revision)
}
// Delete removes all revisions of the given tag. We may want to change these
// semantics in the future, but this will maintain consistency. The underlying
// blobs are left alone.
func (ms *manifestStore) Delete(name, tag string) error {
revisions, err := ms.tagStore.revisions(name, tag)
func (ms *manifestStore) Delete(tag string) error {
revisions, err := ms.tagStore.revisions(tag)
if err != nil {
return err
}
for _, revision := range revisions {
if err := ms.revisionStore.delete(name, revision); err != nil {
if err := ms.revisionStore.delete(revision); err != nil {
return err
}
}
return ms.tagStore.delete(name, tag)
return ms.tagStore.delete(tag)
}
// verifyManifest ensures that the manifest content is valid from the
@ -131,11 +132,11 @@ func (ms *manifestStore) Delete(name, tag string) error {
// that the signature is valid for the enclosed payload. As a policy, the
// registry only tries to store valid content, leaving trust policies of that
// content up to consumers.
func (ms *manifestStore) verifyManifest(name, tag string, mnfst *manifest.SignedManifest) error {
func (ms *manifestStore) verifyManifest(tag string, mnfst *manifest.SignedManifest) error {
var errs ErrManifestVerification
if mnfst.Name != name {
if mnfst.Name != ms.repository.Name() {
// TODO(stevvooe): This needs to be an exported error
errs = append(errs, fmt.Errorf("name does not match manifest name"))
errs = append(errs, fmt.Errorf("repository name does not match manifest name"))
}
if mnfst.Tag != tag {
@ -157,7 +158,7 @@ func (ms *manifestStore) verifyManifest(name, tag string, mnfst *manifest.Signed
}
for _, fsLayer := range mnfst.FSLayers {
exists, err := ms.layerService.Exists(name, fsLayer.BlobSum)
exists, err := ms.repository.Layers().Exists(fsLayer.BlobSum)
if err != nil {
errs = append(errs, err)
}

View file

@ -2,9 +2,12 @@ package storage
import (
"bytes"
"io"
"reflect"
"testing"
"github.com/docker/distribution/testutil"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver/inmemory"
@ -12,36 +15,14 @@ import (
)
func TestManifestStorage(t *testing.T) {
driver := inmemory.New()
pm := pathMapper{
root: "/storage/testing",
version: storagePathVersion,
}
bs := blobStore{
driver: driver,
pm: &pm,
}
ms := &manifestStore{
driver: driver,
pathMapper: &pm,
revisionStore: &revisionStore{
driver: driver,
pathMapper: &pm,
blobStore: &bs,
},
tagStore: &tagStore{
driver: driver,
pathMapper: &pm,
blobStore: &bs,
},
blobStore: &bs,
layerService: newMockedLayerService(),
}
name := "foo/bar"
tag := "thetag"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
repo := registry.Repository(name)
ms := repo.Manifests()
exists, err := ms.Exists(name, tag)
exists, err := ms.Exists(tag)
if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err)
}
@ -50,7 +31,7 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("manifest should not exist")
}
if _, err := ms.Get(name, tag); true {
if _, err := ms.Get(tag); true {
switch err.(type) {
case ErrUnknownManifest:
break
@ -65,14 +46,22 @@ func TestManifestStorage(t *testing.T) {
},
Name: name,
Tag: tag,
FSLayers: []manifest.FSLayer{
{
BlobSum: "asdf",
},
{
BlobSum: "qwer",
},
},
}
// Build up some test layers and add them to the manifest, saving the
// readseekers for upload later.
testLayers := map[digest.Digest]io.ReadSeeker{}
for i := 0; i < 2; i++ {
rs, ds, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("unexpected error generating test layer file")
}
dgst := digest.Digest(ds)
testLayers[digest.Digest(dgst)] = rs
m.FSLayers = append(m.FSLayers, manifest.FSLayer{
BlobSum: dgst,
})
}
pk, err := libtrust.GenerateECP256PrivateKey()
@ -85,21 +74,34 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("error signing manifest: %v", err)
}
err = ms.Put(name, tag, sm)
err = ms.Put(tag, sm)
if err == nil {
t.Fatalf("expected errors putting manifest")
}
// TODO(stevvooe): We expect errors describing all of the missing layers.
ms.layerService.(*mockedExistenceLayerService).add(name, "asdf")
ms.layerService.(*mockedExistenceLayerService).add(name, "qwer")
// Now, upload the layers that were missing!
for dgst, rs := range testLayers {
upload, err := repo.Layers().Upload()
if err != nil {
t.Fatalf("unexpected error creating test upload: %v", err)
}
if err = ms.Put(name, tag, sm); err != nil {
if _, err := io.Copy(upload, rs); err != nil {
t.Fatalf("unexpected error copying to upload: %v", err)
}
if _, err := upload.Finish(dgst); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
}
if err = ms.Put(tag, sm); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err)
}
exists, err = ms.Exists(name, tag)
exists, err = ms.Exists(tag)
if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err)
}
@ -108,7 +110,7 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("manifest should exist")
}
fetchedManifest, err := ms.Get(name, tag)
fetchedManifest, err := ms.Get(tag)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
@ -137,7 +139,7 @@ func TestManifestStorage(t *testing.T) {
}
// Grabs the tags and check that this tagged manifest is present
tags, err := ms.Tags(name)
tags, err := ms.Tags()
if err != nil {
t.Fatalf("unexpected error fetching tags: %v", err)
}
@ -175,11 +177,11 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("unexpected number of signatures: %d != %d", len(sigs2), 1)
}
if err = ms.Put(name, tag, sm2); err != nil {
if err = ms.Put(tag, sm2); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err)
}
fetched, err := ms.Get(name, tag)
fetched, err := ms.Get(tag)
if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err)
}
@ -224,49 +226,7 @@ func TestManifestStorage(t *testing.T) {
}
}
if err := ms.Delete(name, tag); err != nil {
if err := ms.Delete(tag); err != nil {
t.Fatalf("unexpected error deleting manifest: %v", err)
}
}
type layerKey struct {
name string
digest digest.Digest
}
type mockedExistenceLayerService struct {
exists map[layerKey]struct{}
}
func newMockedLayerService() *mockedExistenceLayerService {
return &mockedExistenceLayerService{
exists: make(map[layerKey]struct{}),
}
}
var _ LayerService = &mockedExistenceLayerService{}
func (mels *mockedExistenceLayerService) add(name string, digest digest.Digest) {
mels.exists[layerKey{name: name, digest: digest}] = struct{}{}
}
func (mels *mockedExistenceLayerService) remove(name string, digest digest.Digest) {
delete(mels.exists, layerKey{name: name, digest: digest})
}
func (mels *mockedExistenceLayerService) Exists(name string, digest digest.Digest) (bool, error) {
_, ok := mels.exists[layerKey{name: name, digest: digest}]
return ok, nil
}
func (mockedExistenceLayerService) Fetch(name string, digest digest.Digest) (Layer, error) {
panic("not implemented")
}
func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) {
panic("not implemented")
}
func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) {
panic("not implemented")
}

75
storage/registry.go Normal file
View file

@ -0,0 +1,75 @@
package storage
import "github.com/docker/distribution/storagedriver"
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
driver storagedriver.StorageDriver
pm *pathMapper
blobStore *blobStore
}
// NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry {
bs := &blobStore{}
reg := &registry{
driver: driver,
blobStore: bs,
// TODO(sday): This should be configurable.
pm: defaultPathMapper,
}
reg.blobStore.registry = reg
return reg
}
// Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped.
func (reg *registry) Repository(name string) Repository {
return &repository{
registry: reg,
name: name,
}
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
name string
}
// Name returns the name of the repository.
func (repo *repository) Name() string {
return repo.name
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Manifests() ManifestService {
return &manifestStore{
repository: repo,
revisionStore: &revisionStore{
repository: repo,
},
tagStore: &tagStore{
repository: repo,
},
}
}
// Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Layers() LayerService {
return &layerStore{
repository: repo,
}
}

View file

@ -7,21 +7,18 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/libtrust"
)
// revisionStore supports storing and managing manifest revisions.
type revisionStore struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
blobStore *blobStore
*repository
}
// exists returns true if the revision is available in the named repository.
func (rs *revisionStore) exists(name string, revision digest.Digest) (bool, error) {
revpath, err := rs.pathMapper.path(manifestRevisionPathSpec{
name: name,
func (rs *revisionStore) exists(revision digest.Digest) (bool, error) {
revpath, err := rs.pm.path(manifestRevisionPathSpec{
name: rs.Name(),
revision: revision,
})
@ -38,13 +35,13 @@ func (rs *revisionStore) exists(name string, revision digest.Digest) (bool, erro
}
// get retrieves the manifest, keyed by revision digest.
func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.SignedManifest, error) {
func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, error) {
// Ensure that this revision is available in this repository.
if exists, err := rs.exists(name, revision); err != nil {
if exists, err := rs.exists(revision); err != nil {
return nil, err
} else if !exists {
return nil, ErrUnknownManifestRevision{
Name: name,
Name: rs.Name(),
Revision: revision,
}
}
@ -55,7 +52,7 @@ func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.Sig
}
// Fetch the signatures for the manifest
signatures, err := rs.getSignatures(name, revision)
signatures, err := rs.getSignatures(revision)
if err != nil {
return nil, err
}
@ -83,7 +80,7 @@ func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.Sig
// put stores the manifest in the repository, if not already present. Any
// updated signatures will be stored, as well.
func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.Digest, error) {
func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) {
jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures")
if err != nil {
return "", err
@ -103,7 +100,7 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
}
// Link the revision into the repository.
if err := rs.link(name, revision); err != nil {
if err := rs.link(revision); err != nil {
return "", err
}
@ -114,7 +111,7 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
}
for _, signature := range signatures {
if err := rs.putSignature(name, revision, signature); err != nil {
if err := rs.putSignature(revision, signature); err != nil {
return "", err
}
}
@ -123,9 +120,9 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
}
// link links the revision into the repository.
func (rs *revisionStore) link(name string, revision digest.Digest) error {
revisionPath, err := rs.pathMapper.path(manifestRevisionLinkPathSpec{
name: name,
func (rs *revisionStore) link(revision digest.Digest) error {
revisionPath, err := rs.pm.path(manifestRevisionLinkPathSpec{
name: rs.Name(),
revision: revision,
})
@ -144,9 +141,9 @@ func (rs *revisionStore) link(name string, revision digest.Digest) error {
}
// delete removes the specified manifest revision from storage.
func (rs *revisionStore) delete(name string, revision digest.Digest) error {
revisionPath, err := rs.pathMapper.path(manifestRevisionPathSpec{
name: name,
func (rs *revisionStore) delete(revision digest.Digest) error {
revisionPath, err := rs.pm.path(manifestRevisionPathSpec{
name: rs.Name(),
revision: revision,
})
@ -159,9 +156,9 @@ func (rs *revisionStore) delete(name string, revision digest.Digest) error {
// getSignatures retrieves all of the signature blobs for the specified
// manifest revision.
func (rs *revisionStore) getSignatures(name string, revision digest.Digest) ([][]byte, error) {
signaturesPath, err := rs.pathMapper.path(manifestSignaturesPathSpec{
name: name,
func (rs *revisionStore) getSignatures(revision digest.Digest) ([][]byte, error) {
signaturesPath, err := rs.pm.path(manifestSignaturesPathSpec{
name: rs.Name(),
revision: revision,
})
@ -197,14 +194,14 @@ func (rs *revisionStore) getSignatures(name string, revision digest.Digest) ([][
}
// putSignature stores the signature for the provided manifest revision.
func (rs *revisionStore) putSignature(name string, revision digest.Digest, signature []byte) error {
func (rs *revisionStore) putSignature(revision digest.Digest, signature []byte) error {
signatureDigest, err := rs.blobStore.put(signature)
if err != nil {
return err
}
signaturePath, err := rs.pathMapper.path(manifestSignatureLinkPathSpec{
name: name,
signaturePath, err := rs.pm.path(manifestSignatureLinkPathSpec{
name: rs.Name(),
revision: revision,
signature: signatureDigest,
})

View file

@ -3,101 +3,81 @@ package storage
import (
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
)
// Services provides various services with application-level operations for
// use across backend storage drivers.
type Services struct {
driver storagedriver.StorageDriver
pathMapper *pathMapper
// TODO(stevvooe): These types need to be moved out of the storage package.
// Registry represents a collection of repositories, addressable by name.
type Registry interface {
// Repository should return a reference to the named repository. The
// registry may or may not have the repository but should always return a
// reference.
Repository(name string) Repository
}
// NewServices creates a new Services object to access docker objects stored
// in the underlying driver.
func NewServices(driver storagedriver.StorageDriver) *Services {
// Repository is a named collection of manifests and layers.
type Repository interface {
// Name returns the name of the repository.
Name() string
return &Services{
driver: driver,
// TODO(sday): This should be configurable.
pathMapper: defaultPathMapper,
}
}
// Manifests returns a reference to this repository's manifest service.
Manifests() ManifestService
// Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (ss *Services) Layers() LayerService {
return &layerStore{
driver: ss.driver,
blobStore: &blobStore{
driver: ss.driver,
pm: ss.pathMapper,
},
pathMapper: ss.pathMapper,
}
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (ss *Services) Manifests() ManifestService {
// TODO(stevvooe): Lose this kludge. An intermediary object is clearly
// missing here. This initialization is a mess.
bs := &blobStore{
driver: ss.driver,
pm: ss.pathMapper,
}
return &manifestStore{
driver: ss.driver,
pathMapper: ss.pathMapper,
revisionStore: &revisionStore{
driver: ss.driver,
pathMapper: ss.pathMapper,
blobStore: bs,
},
tagStore: &tagStore{
driver: ss.driver,
blobStore: bs,
pathMapper: ss.pathMapper,
},
blobStore: bs,
layerService: ss.Layers()}
// Layers returns a reference to this repository's layers service.
Layers() LayerService
}
// ManifestService provides operations on image manifests.
type ManifestService interface {
// Tags lists the tags under the named repository.
Tags(name string) ([]string, error)
Tags() ([]string, error)
// Exists returns true if the manifest exists.
Exists(name, tag string) (bool, error)
Exists(tag string) (bool, error)
// Get retrieves the named manifest, if it exists.
Get(name, tag string) (*manifest.SignedManifest, error)
Get(tag string) (*manifest.SignedManifest, error)
// Put creates or updates the named manifest.
Put(name, tag string, manifest *manifest.SignedManifest) error
// Put(tag string, manifest *manifest.SignedManifest) (digest.Digest, error)
Put(tag string, manifest *manifest.SignedManifest) error
// Delete removes the named manifest, if it exists.
Delete(name, tag string) error
Delete(tag string) error
// TODO(stevvooe): There are several changes that need to be done to this
// interface:
//
// 1. Get(tag string) should be GetByTag(tag string)
// 2. Put(tag string, manifest *manifest.SignedManifest) should be
// Put(manifest *manifest.SignedManifest). The method can read the
// tag on manifest to automatically tag it in the repository.
// 3. Need a GetByDigest(dgst digest.Digest) method.
// 4. Allow explicit tagging with Tag(digest digest.Digest, tag string)
// 5. Support reading tags with a re-entrant reader to avoid large
// allocations in the registry.
// 6. Long-term: Provide All() method that lets one scroll through all of
// the manifest entries.
// 7. Long-term: break out concept of signing from manifests. This is
// really a part of the distribution sprint.
// 8. Long-term: Manifest should be an interface. This code shouldn't
// really be concerned with the storage format.
}
// LayerService provides operations on layer files in a backend storage.
type LayerService interface {
// Exists returns true if the layer exists.
Exists(name string, digest digest.Digest) (bool, error)
Exists(digest digest.Digest) (bool, error)
// Fetch the layer identifed by TarSum.
Fetch(name string, digest digest.Digest) (Layer, error)
Fetch(digest digest.Digest) (Layer, error)
// Upload begins a layer upload to repository identified by name,
// returning a handle.
Upload(name string) (LayerUpload, error)
Upload() (LayerUpload, error)
// Resume continues an in progress layer upload, returning a handle to the
// upload. The caller should seek to the latest desired upload location
// before proceeding.
Resume(name, uuid string) (LayerUpload, error)
Resume(uuid string) (LayerUpload, error)
}

View file

@ -9,15 +9,13 @@ import (
// tagStore provides methods to manage manifest tags in a backend storage driver.
type tagStore struct {
driver storagedriver.StorageDriver
blobStore *blobStore
pathMapper *pathMapper
*repository
}
// tags lists the manifest tags for the specified repository.
func (ts *tagStore) tags(name string) ([]string, error) {
p, err := ts.pathMapper.path(manifestTagPathSpec{
name: name,
func (ts *tagStore) tags() ([]string, error) {
p, err := ts.pm.path(manifestTagPathSpec{
name: ts.name,
})
if err != nil {
return nil, err
@ -28,7 +26,7 @@ func (ts *tagStore) tags(name string) ([]string, error) {
if err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
return nil, ErrUnknownRepository{Name: name}
return nil, ErrUnknownRepository{Name: ts.name}
default:
return nil, err
}
@ -44,9 +42,9 @@ func (ts *tagStore) tags(name string) ([]string, error) {
}
// exists returns true if the specified manifest tag exists in the repository.
func (ts *tagStore) exists(name, tag string) (bool, error) {
tagPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{
name: name,
func (ts *tagStore) exists(tag string) (bool, error) {
tagPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {
@ -63,9 +61,9 @@ func (ts *tagStore) exists(name, tag string) (bool, error) {
// tag tags the digest with the given tag, updating the the store to point at
// the current tag. The digest must point to a manifest.
func (ts *tagStore) tag(name, tag string, revision digest.Digest) error {
indexEntryPath, err := ts.pathMapper.path(manifestTagIndexEntryPathSpec{
name: name,
func (ts *tagStore) tag(tag string, revision digest.Digest) error {
indexEntryPath, err := ts.pm.path(manifestTagIndexEntryPathSpec{
name: ts.Name(),
tag: tag,
revision: revision,
})
@ -74,8 +72,8 @@ func (ts *tagStore) tag(name, tag string, revision digest.Digest) error {
return err
}
currentPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{
name: name,
currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
@ -93,9 +91,9 @@ func (ts *tagStore) tag(name, tag string, revision digest.Digest) error {
}
// resolve the current revision for name and tag.
func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) {
currentPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{
name: name,
func (ts *tagStore) resolve(tag string) (digest.Digest, error) {
currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: ts.Name(),
tag: tag,
})
@ -106,7 +104,7 @@ func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) {
if exists, err := exists(ts.driver, currentPath); err != nil {
return "", err
} else if !exists {
return "", ErrUnknownManifest{Name: name, Tag: tag}
return "", ErrUnknownManifest{Name: ts.Name(), Tag: tag}
}
revision, err := ts.blobStore.readlink(currentPath)
@ -118,9 +116,9 @@ func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) {
}
// revisions returns all revisions with the specified name and tag.
func (ts *tagStore) revisions(name, tag string) ([]digest.Digest, error) {
manifestTagIndexPath, err := ts.pathMapper.path(manifestTagIndexPathSpec{
name: name,
func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) {
manifestTagIndexPath, err := ts.pm.path(manifestTagIndexPathSpec{
name: ts.Name(),
tag: tag,
})
@ -146,9 +144,9 @@ func (ts *tagStore) revisions(name, tag string) ([]digest.Digest, error) {
// delete removes the tag from repository, including the history of all
// revisions that have the specified tag.
func (ts *tagStore) delete(name, tag string) error {
tagPath, err := ts.pathMapper.path(manifestTagPathSpec{
name: name,
func (ts *tagStore) delete(tag string) error {
tagPath, err := ts.pm.path(manifestTagPathSpec{
name: ts.Name(),
tag: tag,
})
if err != nil {