Refactor storage API to be registry oriented

In support of making the storage API ready for supporting notifications and
mirroring, we've begun the process of paring down the storage model. The
process started by creating a central Registry interface. From there, the
common name argument on the LayerService and ManifestService was factored into
a Repository interface. The rest of the changes directly follow from this.

An interface wishlist was added, suggesting a direction to take the registry
package that should support the distribution project's future goals. As these
objects move out of the storage package and we implement a Registry backed by
the http client, these design choices will start getting validation.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-01-16 18:24:07 -08:00
parent 03406ab708
commit ea5b999fc0
10 changed files with 287 additions and 301 deletions

View file

@ -18,8 +18,7 @@ import (
// abstraction, providing utility methods that support creating and traversing // abstraction, providing utility methods that support creating and traversing
// backend links. // backend links.
type blobStore struct { type blobStore struct {
driver storagedriver.StorageDriver *registry
pm *pathMapper
} }
// exists reports whether or not the path exists. If the driver returns error // exists reports whether or not the path exists. If the driver returns error

View file

@ -32,23 +32,13 @@ func TestSimpleLayerUpload(t *testing.T) {
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
pm := &pathMapper{ registry := NewRegistryWithDriver(driver)
root: "/storage/testing", ls := registry.Repository(imageName).Layers()
version: storagePathVersion,
}
ls := &layerStore{
driver: driver,
blobStore: &blobStore{
driver: driver,
pm: pm,
},
pathMapper: pm,
}
h := sha256.New() h := sha256.New()
rd := io.TeeReader(randomDataReader, h) rd := io.TeeReader(randomDataReader, h)
layerUpload, err := ls.Upload(imageName) layerUpload, err := ls.Upload()
if err != nil { if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err) t.Fatalf("unexpected error starting layer upload: %s", err)
@ -60,13 +50,13 @@ func TestSimpleLayerUpload(t *testing.T) {
} }
// Do a resume, get unknown upload // Do a resume, get unknown upload
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) layerUpload, err = ls.Resume(layerUpload.UUID())
if err != ErrLayerUploadUnknown { if err != ErrLayerUploadUnknown {
t.Fatalf("unexpected error resuming upload, should be unkown: %v", err) t.Fatalf("unexpected error resuming upload, should be unkown: %v", err)
} }
// Restart! // Restart!
layerUpload, err = ls.Upload(imageName) layerUpload, err = ls.Upload()
if err != nil { if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err) t.Fatalf("unexpected error starting layer upload: %s", err)
} }
@ -97,7 +87,7 @@ func TestSimpleLayerUpload(t *testing.T) {
layerUpload.Close() layerUpload.Close()
// Do a resume, for good fun // Do a resume, for good fun
layerUpload, err = ls.Resume(layerUpload.Name(), layerUpload.UUID()) layerUpload, err = ls.Resume(layerUpload.UUID())
if err != nil { if err != nil {
t.Fatalf("unexpected error resuming upload: %v", err) t.Fatalf("unexpected error resuming upload: %v", err)
} }
@ -110,12 +100,12 @@ func TestSimpleLayerUpload(t *testing.T) {
} }
// After finishing an upload, it should no longer exist. // After finishing an upload, it should no longer exist.
if _, err := ls.Resume(layerUpload.Name(), layerUpload.UUID()); err != ErrLayerUploadUnknown { if _, err := ls.Resume(layerUpload.UUID()); err != ErrLayerUploadUnknown {
t.Fatalf("expected layer upload to be unknown, got %v", err) t.Fatalf("expected layer upload to be unknown, got %v", err)
} }
// Test for existence. // Test for existence.
exists, err := ls.Exists(layer.Name(), layer.Digest()) exists, err := ls.Exists(layer.Digest())
if err != nil { if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err) t.Fatalf("unexpected error checking for existence: %v", err)
} }
@ -145,18 +135,8 @@ func TestSimpleLayerUpload(t *testing.T) {
func TestSimpleLayerRead(t *testing.T) { func TestSimpleLayerRead(t *testing.T) {
imageName := "foo/bar" imageName := "foo/bar"
driver := inmemory.New() driver := inmemory.New()
pm := &pathMapper{ registry := NewRegistryWithDriver(driver)
root: "/storage/testing", ls := registry.Repository(imageName).Layers()
version: storagePathVersion,
}
ls := &layerStore{
driver: driver,
blobStore: &blobStore{
driver: driver,
pm: pm,
},
pathMapper: pm,
}
randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile() randomLayerReader, tarSumStr, err := testutil.CreateRandomTarFile()
if err != nil { if err != nil {
@ -166,7 +146,7 @@ func TestSimpleLayerRead(t *testing.T) {
dgst := digest.Digest(tarSumStr) dgst := digest.Digest(tarSumStr)
// Test for existence. // Test for existence.
exists, err := ls.Exists(imageName, dgst) exists, err := ls.Exists(dgst)
if err != nil { if err != nil {
t.Fatalf("unexpected error checking for existence: %v", err) t.Fatalf("unexpected error checking for existence: %v", err)
} }
@ -176,7 +156,7 @@ func TestSimpleLayerRead(t *testing.T) {
} }
// Try to get the layer and make sure we get a not found error // Try to get the layer and make sure we get a not found error
layer, err := ls.Fetch(imageName, dgst) layer, err := ls.Fetch(dgst)
if err == nil { if err == nil {
t.Fatalf("error expected fetching unknown layer") t.Fatalf("error expected fetching unknown layer")
} }
@ -188,7 +168,7 @@ func TestSimpleLayerRead(t *testing.T) {
t.Fatalf("unexpected error fetching non-existent layer: %v", err) t.Fatalf("unexpected error fetching non-existent layer: %v", err)
} }
randomLayerDigest, err := writeTestLayer(driver, ls.pathMapper, imageName, dgst, randomLayerReader) randomLayerDigest, err := writeTestLayer(driver, ls.(*layerStore).repository.pm, imageName, dgst, randomLayerReader)
if err != nil { if err != nil {
t.Fatalf("unexpected error writing test layer: %v", err) t.Fatalf("unexpected error writing test layer: %v", err)
} }
@ -198,7 +178,7 @@ func TestSimpleLayerRead(t *testing.T) {
t.Fatalf("error getting seeker size for random layer: %v", err) t.Fatalf("error getting seeker size for random layer: %v", err)
} }
layer, err = ls.Fetch(imageName, dgst) layer, err = ls.Fetch(dgst)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -10,15 +10,13 @@ import (
) )
type layerStore struct { type layerStore struct {
driver storagedriver.StorageDriver repository *repository
pathMapper *pathMapper
blobStore *blobStore
} }
func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) { func (ls *layerStore) Exists(digest digest.Digest) (bool, error) {
// Because this implementation just follows blob links, an existence check // Because this implementation just follows blob links, an existence check
// is pretty cheap by starting and closing a fetch. // is pretty cheap by starting and closing a fetch.
_, err := ls.Fetch(name, digest) _, err := ls.Fetch(digest)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
@ -32,20 +30,20 @@ func (ls *layerStore) Exists(name string, digest digest.Digest) (bool, error) {
return true, nil return true, nil
} }
func (ls *layerStore) Fetch(name string, dgst digest.Digest) (Layer, error) { func (ls *layerStore) Fetch(dgst digest.Digest) (Layer, error) {
bp, err := ls.path(name, dgst) bp, err := ls.path(dgst)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fr, err := newFileReader(ls.driver, bp) fr, err := newFileReader(ls.repository.driver, bp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &layerReader{ return &layerReader{
fileReader: *fr, fileReader: *fr,
name: name, name: ls.repository.Name(),
digest: dgst, digest: dgst,
}, nil }, nil
} }
@ -53,7 +51,7 @@ func (ls *layerStore) Fetch(name string, dgst digest.Digest) (Layer, error) {
// Upload begins a layer upload, returning a handle. If the layer upload // Upload begins a layer upload, returning a handle. If the layer upload
// is already in progress or the layer has already been uploaded, this // is already in progress or the layer has already been uploaded, this
// will return an error. // will return an error.
func (ls *layerStore) Upload(name string) (LayerUpload, error) { func (ls *layerStore) Upload() (LayerUpload, error) {
// NOTE(stevvooe): Consider the issues with allowing concurrent upload of // NOTE(stevvooe): Consider the issues with allowing concurrent upload of
// the same two layers. Should it be disallowed? For now, we allow both // the same two layers. Should it be disallowed? For now, we allow both
@ -62,8 +60,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
uuid := uuid.New() uuid := uuid.New()
startedAt := time.Now().UTC() startedAt := time.Now().UTC()
path, err := ls.pathMapper.path(uploadDataPathSpec{ path, err := ls.repository.registry.pm.path(uploadDataPathSpec{
name: name, name: ls.repository.Name(),
uuid: uuid, uuid: uuid,
}) })
@ -71,8 +69,8 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
return nil, err return nil, err
} }
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: name, name: ls.repository.Name(),
uuid: uuid, uuid: uuid,
}) })
@ -81,18 +79,18 @@ func (ls *layerStore) Upload(name string) (LayerUpload, error) {
} }
// Write a startedat file for this upload // Write a startedat file for this upload
if err := ls.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil { if err := ls.repository.driver.PutContent(startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err return nil, err
} }
return ls.newLayerUpload(name, uuid, path, startedAt) return ls.newLayerUpload(uuid, path, startedAt)
} }
// Resume continues an in progress layer upload, returning the current // Resume continues an in progress layer upload, returning the current
// state of the upload. // state of the upload.
func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) { func (ls *layerStore) Resume(uuid string) (LayerUpload, error) {
startedAtPath, err := ls.pathMapper.path(uploadStartedAtPathSpec{ startedAtPath, err := ls.repository.registry.pm.path(uploadStartedAtPathSpec{
name: name, name: ls.repository.Name(),
uuid: uuid, uuid: uuid,
}) })
@ -100,7 +98,7 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err return nil, err
} }
startedAtBytes, err := ls.driver.GetContent(startedAtPath) startedAtBytes, err := ls.repository.driver.GetContent(startedAtPath)
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
@ -115,8 +113,8 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err return nil, err
} }
path, err := ls.pathMapper.path(uploadDataPathSpec{ path, err := ls.repository.pm.path(uploadDataPathSpec{
name: name, name: ls.repository.Name(),
uuid: uuid, uuid: uuid,
}) })
@ -124,33 +122,32 @@ func (ls *layerStore) Resume(name, uuid string) (LayerUpload, error) {
return nil, err return nil, err
} }
return ls.newLayerUpload(name, uuid, path, startedAt) return ls.newLayerUpload(uuid, path, startedAt)
} }
// newLayerUpload allocates a new upload controller with the given state. // newLayerUpload allocates a new upload controller with the given state.
func (ls *layerStore) newLayerUpload(name, uuid, path string, startedAt time.Time) (LayerUpload, error) { func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (LayerUpload, error) {
fw, err := newFileWriter(ls.driver, path) fw, err := newFileWriter(ls.repository.driver, path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &layerUploadController{ return &layerUploadController{
layerStore: ls, layerStore: ls,
name: name,
uuid: uuid, uuid: uuid,
startedAt: startedAt, startedAt: startedAt,
fileWriter: *fw, fileWriter: *fw,
}, nil }, nil
} }
func (ls *layerStore) path(name string, dgst digest.Digest) (string, error) { func (ls *layerStore) path(dgst digest.Digest) (string, error) {
// We must traverse this path through the link to enforce ownership. // We must traverse this path through the link to enforce ownership.
layerLinkPath, err := ls.pathMapper.path(layerLinkPathSpec{name: name, digest: dgst}) layerLinkPath, err := ls.repository.registry.pm.path(layerLinkPathSpec{name: ls.repository.Name(), digest: dgst})
if err != nil { if err != nil {
return "", err return "", err
} }
blobPath, err := ls.blobStore.resolve(layerLinkPath) blobPath, err := ls.repository.blobStore.resolve(layerLinkPath)
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {

View file

@ -16,7 +16,6 @@ import (
type layerUploadController struct { type layerUploadController struct {
layerStore *layerStore layerStore *layerStore
name string
uuid string uuid string
startedAt time.Time startedAt time.Time
@ -27,7 +26,7 @@ var _ LayerUpload = &layerUploadController{}
// Name of the repository under which the layer will be linked. // Name of the repository under which the layer will be linked.
func (luc *layerUploadController) Name() string { func (luc *layerUploadController) Name() string {
return luc.name return luc.layerStore.repository.Name()
} }
// UUID returns the identifier for this upload. // UUID returns the identifier for this upload.
@ -63,7 +62,7 @@ func (luc *layerUploadController) Finish(digest digest.Digest) (Layer, error) {
return nil, err return nil, err
} }
return luc.layerStore.Fetch(luc.Name(), canonical) return luc.layerStore.Fetch(canonical)
} }
// Cancel the layer upload process. // Cancel the layer upload process.
@ -128,7 +127,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// identified by dgst. The layer should be validated before commencing the // identified by dgst. The layer should be validated before commencing the
// move. // move.
func (luc *layerUploadController) moveLayer(dgst digest.Digest) error { func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
blobPath, err := luc.layerStore.pathMapper.path(blobDataPathSpec{ blobPath, err := luc.layerStore.repository.registry.pm.path(blobDataPathSpec{
digest: dgst, digest: dgst,
}) })
@ -137,7 +136,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
} }
// Check for existence // Check for existence
if _, err := luc.layerStore.driver.Stat(blobPath); err != nil { if _, err := luc.layerStore.repository.registry.driver.Stat(blobPath); err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist. break // ensure that it doesn't exist.
@ -158,7 +157,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
// linkLayer links a valid, written layer blob into the registry under the // linkLayer links a valid, written layer blob into the registry under the
// named repository for the upload controller. // named repository for the upload controller.
func (luc *layerUploadController) linkLayer(digest digest.Digest) error { func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
layerLinkPath, err := luc.layerStore.pathMapper.path(layerLinkPathSpec{ layerLinkPath, err := luc.layerStore.repository.registry.pm.path(layerLinkPathSpec{
name: luc.Name(), name: luc.Name(),
digest: digest, digest: digest,
}) })
@ -167,15 +166,15 @@ func (luc *layerUploadController) linkLayer(digest digest.Digest) error {
return err return err
} }
return luc.layerStore.driver.PutContent(layerLinkPath, []byte(digest)) return luc.layerStore.repository.registry.driver.PutContent(layerLinkPath, []byte(digest))
} }
// removeResources should clean up all resources associated with the upload // removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the // instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned. // resources are already not present, no error will be returned.
func (luc *layerUploadController) removeResources() error { func (luc *layerUploadController) removeResources() error {
dataPath, err := luc.layerStore.pathMapper.path(uploadDataPathSpec{ dataPath, err := luc.layerStore.repository.registry.pm.path(uploadDataPathSpec{
name: luc.name, name: luc.Name(),
uuid: luc.uuid, uuid: luc.uuid,
}) })

View file

@ -6,7 +6,6 @@ import (
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/libtrust" "github.com/docker/libtrust"
) )
@ -65,65 +64,67 @@ func (errs ErrManifestVerification) Error() string {
} }
type manifestStore struct { type manifestStore struct {
driver storagedriver.StorageDriver repository *repository
pathMapper *pathMapper
revisionStore *revisionStore revisionStore *revisionStore
tagStore *tagStore tagStore *tagStore
blobStore *blobStore
layerService LayerService
} }
var _ ManifestService = &manifestStore{} var _ ManifestService = &manifestStore{}
func (ms *manifestStore) Tags(name string) ([]string, error) { // func (ms *manifestStore) Repository() Repository {
return ms.tagStore.tags(name) // return ms.repository
// }
func (ms *manifestStore) Tags() ([]string, error) {
return ms.tagStore.tags()
} }
func (ms *manifestStore) Exists(name, tag string) (bool, error) { func (ms *manifestStore) Exists(tag string) (bool, error) {
return ms.tagStore.exists(name, tag) return ms.tagStore.exists(tag)
} }
func (ms *manifestStore) Get(name, tag string) (*manifest.SignedManifest, error) { func (ms *manifestStore) Get(tag string) (*manifest.SignedManifest, error) {
dgst, err := ms.tagStore.resolve(name, tag) dgst, err := ms.tagStore.resolve(tag)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return ms.revisionStore.get(name, dgst) return ms.revisionStore.get(dgst)
} }
func (ms *manifestStore) Put(name, tag string, manifest *manifest.SignedManifest) error { func (ms *manifestStore) Put(tag string, manifest *manifest.SignedManifest) error {
// Verify the manifest. // Verify the manifest.
if err := ms.verifyManifest(name, tag, manifest); err != nil { if err := ms.verifyManifest(tag, manifest); err != nil {
return err return err
} }
// Store the revision of the manifest // Store the revision of the manifest
revision, err := ms.revisionStore.put(name, manifest) revision, err := ms.revisionStore.put(manifest)
if err != nil { if err != nil {
return err return err
} }
// Now, tag the manifest // Now, tag the manifest
return ms.tagStore.tag(name, tag, revision) return ms.tagStore.tag(tag, revision)
} }
// Delete removes all revisions of the given tag. We may want to change these // Delete removes all revisions of the given tag. We may want to change these
// semantics in the future, but this will maintain consistency. The underlying // semantics in the future, but this will maintain consistency. The underlying
// blobs are left alone. // blobs are left alone.
func (ms *manifestStore) Delete(name, tag string) error { func (ms *manifestStore) Delete(tag string) error {
revisions, err := ms.tagStore.revisions(name, tag) revisions, err := ms.tagStore.revisions(tag)
if err != nil { if err != nil {
return err return err
} }
for _, revision := range revisions { for _, revision := range revisions {
if err := ms.revisionStore.delete(name, revision); err != nil { if err := ms.revisionStore.delete(revision); err != nil {
return err return err
} }
} }
return ms.tagStore.delete(name, tag) return ms.tagStore.delete(tag)
} }
// verifyManifest ensures that the manifest content is valid from the // verifyManifest ensures that the manifest content is valid from the
@ -131,11 +132,11 @@ func (ms *manifestStore) Delete(name, tag string) error {
// that the signature is valid for the enclosed payload. As a policy, the // that the signature is valid for the enclosed payload. As a policy, the
// registry only tries to store valid content, leaving trust policies of that // registry only tries to store valid content, leaving trust policies of that
// content up to consumers. // content up to consumers.
func (ms *manifestStore) verifyManifest(name, tag string, mnfst *manifest.SignedManifest) error { func (ms *manifestStore) verifyManifest(tag string, mnfst *manifest.SignedManifest) error {
var errs ErrManifestVerification var errs ErrManifestVerification
if mnfst.Name != name { if mnfst.Name != ms.repository.Name() {
// TODO(stevvooe): This needs to be an exported error // TODO(stevvooe): This needs to be an exported error
errs = append(errs, fmt.Errorf("name does not match manifest name")) errs = append(errs, fmt.Errorf("repository name does not match manifest name"))
} }
if mnfst.Tag != tag { if mnfst.Tag != tag {
@ -157,7 +158,7 @@ func (ms *manifestStore) verifyManifest(name, tag string, mnfst *manifest.Signed
} }
for _, fsLayer := range mnfst.FSLayers { for _, fsLayer := range mnfst.FSLayers {
exists, err := ms.layerService.Exists(name, fsLayer.BlobSum) exists, err := ms.repository.Layers().Exists(fsLayer.BlobSum)
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
} }

View file

@ -2,9 +2,12 @@ package storage
import ( import (
"bytes" "bytes"
"io"
"reflect" "reflect"
"testing" "testing"
"github.com/docker/distribution/testutil"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver/inmemory" "github.com/docker/distribution/storagedriver/inmemory"
@ -12,36 +15,14 @@ import (
) )
func TestManifestStorage(t *testing.T) { func TestManifestStorage(t *testing.T) {
driver := inmemory.New()
pm := pathMapper{
root: "/storage/testing",
version: storagePathVersion,
}
bs := blobStore{
driver: driver,
pm: &pm,
}
ms := &manifestStore{
driver: driver,
pathMapper: &pm,
revisionStore: &revisionStore{
driver: driver,
pathMapper: &pm,
blobStore: &bs,
},
tagStore: &tagStore{
driver: driver,
pathMapper: &pm,
blobStore: &bs,
},
blobStore: &bs,
layerService: newMockedLayerService(),
}
name := "foo/bar" name := "foo/bar"
tag := "thetag" tag := "thetag"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
repo := registry.Repository(name)
ms := repo.Manifests()
exists, err := ms.Exists(name, tag) exists, err := ms.Exists(tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err) t.Fatalf("unexpected error checking manifest existence: %v", err)
} }
@ -50,7 +31,7 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("manifest should not exist") t.Fatalf("manifest should not exist")
} }
if _, err := ms.Get(name, tag); true { if _, err := ms.Get(tag); true {
switch err.(type) { switch err.(type) {
case ErrUnknownManifest: case ErrUnknownManifest:
break break
@ -65,14 +46,22 @@ func TestManifestStorage(t *testing.T) {
}, },
Name: name, Name: name,
Tag: tag, Tag: tag,
FSLayers: []manifest.FSLayer{ }
{
BlobSum: "asdf", // Build up some test layers and add them to the manifest, saving the
}, // readseekers for upload later.
{ testLayers := map[digest.Digest]io.ReadSeeker{}
BlobSum: "qwer", for i := 0; i < 2; i++ {
}, rs, ds, err := testutil.CreateRandomTarFile()
}, if err != nil {
t.Fatalf("unexpected error generating test layer file")
}
dgst := digest.Digest(ds)
testLayers[digest.Digest(dgst)] = rs
m.FSLayers = append(m.FSLayers, manifest.FSLayer{
BlobSum: dgst,
})
} }
pk, err := libtrust.GenerateECP256PrivateKey() pk, err := libtrust.GenerateECP256PrivateKey()
@ -85,21 +74,34 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("error signing manifest: %v", err) t.Fatalf("error signing manifest: %v", err)
} }
err = ms.Put(name, tag, sm) err = ms.Put(tag, sm)
if err == nil { if err == nil {
t.Fatalf("expected errors putting manifest") t.Fatalf("expected errors putting manifest")
} }
// TODO(stevvooe): We expect errors describing all of the missing layers. // TODO(stevvooe): We expect errors describing all of the missing layers.
ms.layerService.(*mockedExistenceLayerService).add(name, "asdf") // Now, upload the layers that were missing!
ms.layerService.(*mockedExistenceLayerService).add(name, "qwer") for dgst, rs := range testLayers {
upload, err := repo.Layers().Upload()
if err != nil {
t.Fatalf("unexpected error creating test upload: %v", err)
}
if err = ms.Put(name, tag, sm); err != nil { if _, err := io.Copy(upload, rs); err != nil {
t.Fatalf("unexpected error copying to upload: %v", err)
}
if _, err := upload.Finish(dgst); err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
}
if err = ms.Put(tag, sm); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err) t.Fatalf("unexpected error putting manifest: %v", err)
} }
exists, err = ms.Exists(name, tag) exists, err = ms.Exists(tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error checking manifest existence: %v", err) t.Fatalf("unexpected error checking manifest existence: %v", err)
} }
@ -108,7 +110,7 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("manifest should exist") t.Fatalf("manifest should exist")
} }
fetchedManifest, err := ms.Get(name, tag) fetchedManifest, err := ms.Get(tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err) t.Fatalf("unexpected error fetching manifest: %v", err)
} }
@ -137,7 +139,7 @@ func TestManifestStorage(t *testing.T) {
} }
// Grabs the tags and check that this tagged manifest is present // Grabs the tags and check that this tagged manifest is present
tags, err := ms.Tags(name) tags, err := ms.Tags()
if err != nil { if err != nil {
t.Fatalf("unexpected error fetching tags: %v", err) t.Fatalf("unexpected error fetching tags: %v", err)
} }
@ -175,11 +177,11 @@ func TestManifestStorage(t *testing.T) {
t.Fatalf("unexpected number of signatures: %d != %d", len(sigs2), 1) t.Fatalf("unexpected number of signatures: %d != %d", len(sigs2), 1)
} }
if err = ms.Put(name, tag, sm2); err != nil { if err = ms.Put(tag, sm2); err != nil {
t.Fatalf("unexpected error putting manifest: %v", err) t.Fatalf("unexpected error putting manifest: %v", err)
} }
fetched, err := ms.Get(name, tag) fetched, err := ms.Get(tag)
if err != nil { if err != nil {
t.Fatalf("unexpected error fetching manifest: %v", err) t.Fatalf("unexpected error fetching manifest: %v", err)
} }
@ -224,49 +226,7 @@ func TestManifestStorage(t *testing.T) {
} }
} }
if err := ms.Delete(name, tag); err != nil { if err := ms.Delete(tag); err != nil {
t.Fatalf("unexpected error deleting manifest: %v", err) t.Fatalf("unexpected error deleting manifest: %v", err)
} }
} }
type layerKey struct {
name string
digest digest.Digest
}
type mockedExistenceLayerService struct {
exists map[layerKey]struct{}
}
func newMockedLayerService() *mockedExistenceLayerService {
return &mockedExistenceLayerService{
exists: make(map[layerKey]struct{}),
}
}
var _ LayerService = &mockedExistenceLayerService{}
func (mels *mockedExistenceLayerService) add(name string, digest digest.Digest) {
mels.exists[layerKey{name: name, digest: digest}] = struct{}{}
}
func (mels *mockedExistenceLayerService) remove(name string, digest digest.Digest) {
delete(mels.exists, layerKey{name: name, digest: digest})
}
func (mels *mockedExistenceLayerService) Exists(name string, digest digest.Digest) (bool, error) {
_, ok := mels.exists[layerKey{name: name, digest: digest}]
return ok, nil
}
func (mockedExistenceLayerService) Fetch(name string, digest digest.Digest) (Layer, error) {
panic("not implemented")
}
func (mockedExistenceLayerService) Upload(name string) (LayerUpload, error) {
panic("not implemented")
}
func (mockedExistenceLayerService) Resume(name, uuid string) (LayerUpload, error) {
panic("not implemented")
}

75
storage/registry.go Normal file
View file

@ -0,0 +1,75 @@
package storage
import "github.com/docker/distribution/storagedriver"
// registry is the top-level implementation of Registry for use in the storage
// package. All instances should descend from this object.
type registry struct {
driver storagedriver.StorageDriver
pm *pathMapper
blobStore *blobStore
}
// NewRegistryWithDriver creates a new registry instance from the provided
// driver. The resulting registry may be shared by multiple goroutines but is
// cheap to allocate.
func NewRegistryWithDriver(driver storagedriver.StorageDriver) Registry {
bs := &blobStore{}
reg := &registry{
driver: driver,
blobStore: bs,
// TODO(sday): This should be configurable.
pm: defaultPathMapper,
}
reg.blobStore.registry = reg
return reg
}
// Repository returns an instance of the repository tied to the registry.
// Instances should not be shared between goroutines but are cheap to
// allocate. In general, they should be request scoped.
func (reg *registry) Repository(name string) Repository {
return &repository{
registry: reg,
name: name,
}
}
// repository provides name-scoped access to various services.
type repository struct {
*registry
name string
}
// Name returns the name of the repository.
func (repo *repository) Name() string {
return repo.name
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Manifests() ManifestService {
return &manifestStore{
repository: repo,
revisionStore: &revisionStore{
repository: repo,
},
tagStore: &tagStore{
repository: repo,
},
}
}
// Layers returns an instance of the LayerService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (repo *repository) Layers() LayerService {
return &layerStore{
repository: repo,
}
}

View file

@ -7,21 +7,18 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
"github.com/docker/libtrust" "github.com/docker/libtrust"
) )
// revisionStore supports storing and managing manifest revisions. // revisionStore supports storing and managing manifest revisions.
type revisionStore struct { type revisionStore struct {
driver storagedriver.StorageDriver *repository
pathMapper *pathMapper
blobStore *blobStore
} }
// exists returns true if the revision is available in the named repository. // exists returns true if the revision is available in the named repository.
func (rs *revisionStore) exists(name string, revision digest.Digest) (bool, error) { func (rs *revisionStore) exists(revision digest.Digest) (bool, error) {
revpath, err := rs.pathMapper.path(manifestRevisionPathSpec{ revpath, err := rs.pm.path(manifestRevisionPathSpec{
name: name, name: rs.Name(),
revision: revision, revision: revision,
}) })
@ -38,13 +35,13 @@ func (rs *revisionStore) exists(name string, revision digest.Digest) (bool, erro
} }
// get retrieves the manifest, keyed by revision digest. // get retrieves the manifest, keyed by revision digest.
func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.SignedManifest, error) { func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest, error) {
// Ensure that this revision is available in this repository. // Ensure that this revision is available in this repository.
if exists, err := rs.exists(name, revision); err != nil { if exists, err := rs.exists(revision); err != nil {
return nil, err return nil, err
} else if !exists { } else if !exists {
return nil, ErrUnknownManifestRevision{ return nil, ErrUnknownManifestRevision{
Name: name, Name: rs.Name(),
Revision: revision, Revision: revision,
} }
} }
@ -55,7 +52,7 @@ func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.Sig
} }
// Fetch the signatures for the manifest // Fetch the signatures for the manifest
signatures, err := rs.getSignatures(name, revision) signatures, err := rs.getSignatures(revision)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -83,7 +80,7 @@ func (rs *revisionStore) get(name string, revision digest.Digest) (*manifest.Sig
// put stores the manifest in the repository, if not already present. Any // put stores the manifest in the repository, if not already present. Any
// updated signatures will be stored, as well. // updated signatures will be stored, as well.
func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.Digest, error) { func (rs *revisionStore) put(sm *manifest.SignedManifest) (digest.Digest, error) {
jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures") jsig, err := libtrust.ParsePrettySignature(sm.Raw, "signatures")
if err != nil { if err != nil {
return "", err return "", err
@ -103,7 +100,7 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
} }
// Link the revision into the repository. // Link the revision into the repository.
if err := rs.link(name, revision); err != nil { if err := rs.link(revision); err != nil {
return "", err return "", err
} }
@ -114,7 +111,7 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
} }
for _, signature := range signatures { for _, signature := range signatures {
if err := rs.putSignature(name, revision, signature); err != nil { if err := rs.putSignature(revision, signature); err != nil {
return "", err return "", err
} }
} }
@ -123,9 +120,9 @@ func (rs *revisionStore) put(name string, sm *manifest.SignedManifest) (digest.D
} }
// link links the revision into the repository. // link links the revision into the repository.
func (rs *revisionStore) link(name string, revision digest.Digest) error { func (rs *revisionStore) link(revision digest.Digest) error {
revisionPath, err := rs.pathMapper.path(manifestRevisionLinkPathSpec{ revisionPath, err := rs.pm.path(manifestRevisionLinkPathSpec{
name: name, name: rs.Name(),
revision: revision, revision: revision,
}) })
@ -144,9 +141,9 @@ func (rs *revisionStore) link(name string, revision digest.Digest) error {
} }
// delete removes the specified manifest revision from storage. // delete removes the specified manifest revision from storage.
func (rs *revisionStore) delete(name string, revision digest.Digest) error { func (rs *revisionStore) delete(revision digest.Digest) error {
revisionPath, err := rs.pathMapper.path(manifestRevisionPathSpec{ revisionPath, err := rs.pm.path(manifestRevisionPathSpec{
name: name, name: rs.Name(),
revision: revision, revision: revision,
}) })
@ -159,9 +156,9 @@ func (rs *revisionStore) delete(name string, revision digest.Digest) error {
// getSignatures retrieves all of the signature blobs for the specified // getSignatures retrieves all of the signature blobs for the specified
// manifest revision. // manifest revision.
func (rs *revisionStore) getSignatures(name string, revision digest.Digest) ([][]byte, error) { func (rs *revisionStore) getSignatures(revision digest.Digest) ([][]byte, error) {
signaturesPath, err := rs.pathMapper.path(manifestSignaturesPathSpec{ signaturesPath, err := rs.pm.path(manifestSignaturesPathSpec{
name: name, name: rs.Name(),
revision: revision, revision: revision,
}) })
@ -197,14 +194,14 @@ func (rs *revisionStore) getSignatures(name string, revision digest.Digest) ([][
} }
// putSignature stores the signature for the provided manifest revision. // putSignature stores the signature for the provided manifest revision.
func (rs *revisionStore) putSignature(name string, revision digest.Digest, signature []byte) error { func (rs *revisionStore) putSignature(revision digest.Digest, signature []byte) error {
signatureDigest, err := rs.blobStore.put(signature) signatureDigest, err := rs.blobStore.put(signature)
if err != nil { if err != nil {
return err return err
} }
signaturePath, err := rs.pathMapper.path(manifestSignatureLinkPathSpec{ signaturePath, err := rs.pm.path(manifestSignatureLinkPathSpec{
name: name, name: rs.Name(),
revision: revision, revision: revision,
signature: signatureDigest, signature: signatureDigest,
}) })

View file

@ -3,101 +3,81 @@ package storage
import ( import (
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest" "github.com/docker/distribution/manifest"
"github.com/docker/distribution/storagedriver"
) )
// Services provides various services with application-level operations for // TODO(stevvooe): These types need to be moved out of the storage package.
// use across backend storage drivers.
type Services struct { // Registry represents a collection of repositories, addressable by name.
driver storagedriver.StorageDriver type Registry interface {
pathMapper *pathMapper // Repository should return a reference to the named repository. The
// registry may or may not have the repository but should always return a
// reference.
Repository(name string) Repository
} }
// NewServices creates a new Services object to access docker objects stored // Repository is a named collection of manifests and layers.
// in the underlying driver. type Repository interface {
func NewServices(driver storagedriver.StorageDriver) *Services { // Name returns the name of the repository.
Name() string
return &Services{ // Manifests returns a reference to this repository's manifest service.
driver: driver, Manifests() ManifestService
// TODO(sday): This should be configurable.
pathMapper: defaultPathMapper,
}
}
// Layers returns an instance of the LayerService. Instantiation is cheap and // Layers returns a reference to this repository's layers service.
// may be context sensitive in the future. The instance should be used similar Layers() LayerService
// to a request local.
func (ss *Services) Layers() LayerService {
return &layerStore{
driver: ss.driver,
blobStore: &blobStore{
driver: ss.driver,
pm: ss.pathMapper,
},
pathMapper: ss.pathMapper,
}
}
// Manifests returns an instance of ManifestService. Instantiation is cheap and
// may be context sensitive in the future. The instance should be used similar
// to a request local.
func (ss *Services) Manifests() ManifestService {
// TODO(stevvooe): Lose this kludge. An intermediary object is clearly
// missing here. This initialization is a mess.
bs := &blobStore{
driver: ss.driver,
pm: ss.pathMapper,
}
return &manifestStore{
driver: ss.driver,
pathMapper: ss.pathMapper,
revisionStore: &revisionStore{
driver: ss.driver,
pathMapper: ss.pathMapper,
blobStore: bs,
},
tagStore: &tagStore{
driver: ss.driver,
blobStore: bs,
pathMapper: ss.pathMapper,
},
blobStore: bs,
layerService: ss.Layers()}
} }
// ManifestService provides operations on image manifests. // ManifestService provides operations on image manifests.
type ManifestService interface { type ManifestService interface {
// Tags lists the tags under the named repository. // Tags lists the tags under the named repository.
Tags(name string) ([]string, error) Tags() ([]string, error)
// Exists returns true if the manifest exists. // Exists returns true if the manifest exists.
Exists(name, tag string) (bool, error) Exists(tag string) (bool, error)
// Get retrieves the named manifest, if it exists. // Get retrieves the named manifest, if it exists.
Get(name, tag string) (*manifest.SignedManifest, error) Get(tag string) (*manifest.SignedManifest, error)
// Put creates or updates the named manifest. // Put creates or updates the named manifest.
Put(name, tag string, manifest *manifest.SignedManifest) error // Put(tag string, manifest *manifest.SignedManifest) (digest.Digest, error)
Put(tag string, manifest *manifest.SignedManifest) error
// Delete removes the named manifest, if it exists. // Delete removes the named manifest, if it exists.
Delete(name, tag string) error Delete(tag string) error
// TODO(stevvooe): There are several changes that need to be done to this
// interface:
//
// 1. Get(tag string) should be GetByTag(tag string)
// 2. Put(tag string, manifest *manifest.SignedManifest) should be
// Put(manifest *manifest.SignedManifest). The method can read the
// tag on manifest to automatically tag it in the repository.
// 3. Need a GetByDigest(dgst digest.Digest) method.
// 4. Allow explicit tagging with Tag(digest digest.Digest, tag string)
// 5. Support reading tags with a re-entrant reader to avoid large
// allocations in the registry.
// 6. Long-term: Provide All() method that lets one scroll through all of
// the manifest entries.
// 7. Long-term: break out concept of signing from manifests. This is
// really a part of the distribution sprint.
// 8. Long-term: Manifest should be an interface. This code shouldn't
// really be concerned with the storage format.
} }
// LayerService provides operations on layer files in a backend storage. // LayerService provides operations on layer files in a backend storage.
type LayerService interface { type LayerService interface {
// Exists returns true if the layer exists. // Exists returns true if the layer exists.
Exists(name string, digest digest.Digest) (bool, error) Exists(digest digest.Digest) (bool, error)
// Fetch the layer identifed by TarSum. // Fetch the layer identifed by TarSum.
Fetch(name string, digest digest.Digest) (Layer, error) Fetch(digest digest.Digest) (Layer, error)
// Upload begins a layer upload to repository identified by name, // Upload begins a layer upload to repository identified by name,
// returning a handle. // returning a handle.
Upload(name string) (LayerUpload, error) Upload() (LayerUpload, error)
// Resume continues an in progress layer upload, returning a handle to the // Resume continues an in progress layer upload, returning a handle to the
// upload. The caller should seek to the latest desired upload location // upload. The caller should seek to the latest desired upload location
// before proceeding. // before proceeding.
Resume(name, uuid string) (LayerUpload, error) Resume(uuid string) (LayerUpload, error)
} }

View file

@ -9,15 +9,13 @@ import (
// tagStore provides methods to manage manifest tags in a backend storage driver. // tagStore provides methods to manage manifest tags in a backend storage driver.
type tagStore struct { type tagStore struct {
driver storagedriver.StorageDriver *repository
blobStore *blobStore
pathMapper *pathMapper
} }
// tags lists the manifest tags for the specified repository. // tags lists the manifest tags for the specified repository.
func (ts *tagStore) tags(name string) ([]string, error) { func (ts *tagStore) tags() ([]string, error) {
p, err := ts.pathMapper.path(manifestTagPathSpec{ p, err := ts.pm.path(manifestTagPathSpec{
name: name, name: ts.name,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -28,7 +26,7 @@ func (ts *tagStore) tags(name string) ([]string, error) {
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
return nil, ErrUnknownRepository{Name: name} return nil, ErrUnknownRepository{Name: ts.name}
default: default:
return nil, err return nil, err
} }
@ -44,9 +42,9 @@ func (ts *tagStore) tags(name string) ([]string, error) {
} }
// exists returns true if the specified manifest tag exists in the repository. // exists returns true if the specified manifest tag exists in the repository.
func (ts *tagStore) exists(name, tag string) (bool, error) { func (ts *tagStore) exists(tag string) (bool, error) {
tagPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{ tagPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
}) })
if err != nil { if err != nil {
@ -63,9 +61,9 @@ func (ts *tagStore) exists(name, tag string) (bool, error) {
// tag tags the digest with the given tag, updating the the store to point at // tag tags the digest with the given tag, updating the the store to point at
// the current tag. The digest must point to a manifest. // the current tag. The digest must point to a manifest.
func (ts *tagStore) tag(name, tag string, revision digest.Digest) error { func (ts *tagStore) tag(tag string, revision digest.Digest) error {
indexEntryPath, err := ts.pathMapper.path(manifestTagIndexEntryPathSpec{ indexEntryPath, err := ts.pm.path(manifestTagIndexEntryPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
revision: revision, revision: revision,
}) })
@ -74,8 +72,8 @@ func (ts *tagStore) tag(name, tag string, revision digest.Digest) error {
return err return err
} }
currentPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{ currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
}) })
@ -93,9 +91,9 @@ func (ts *tagStore) tag(name, tag string, revision digest.Digest) error {
} }
// resolve the current revision for name and tag. // resolve the current revision for name and tag.
func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) { func (ts *tagStore) resolve(tag string) (digest.Digest, error) {
currentPath, err := ts.pathMapper.path(manifestTagCurrentPathSpec{ currentPath, err := ts.pm.path(manifestTagCurrentPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
}) })
@ -106,7 +104,7 @@ func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) {
if exists, err := exists(ts.driver, currentPath); err != nil { if exists, err := exists(ts.driver, currentPath); err != nil {
return "", err return "", err
} else if !exists { } else if !exists {
return "", ErrUnknownManifest{Name: name, Tag: tag} return "", ErrUnknownManifest{Name: ts.Name(), Tag: tag}
} }
revision, err := ts.blobStore.readlink(currentPath) revision, err := ts.blobStore.readlink(currentPath)
@ -118,9 +116,9 @@ func (ts *tagStore) resolve(name, tag string) (digest.Digest, error) {
} }
// revisions returns all revisions with the specified name and tag. // revisions returns all revisions with the specified name and tag.
func (ts *tagStore) revisions(name, tag string) ([]digest.Digest, error) { func (ts *tagStore) revisions(tag string) ([]digest.Digest, error) {
manifestTagIndexPath, err := ts.pathMapper.path(manifestTagIndexPathSpec{ manifestTagIndexPath, err := ts.pm.path(manifestTagIndexPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
}) })
@ -146,9 +144,9 @@ func (ts *tagStore) revisions(name, tag string) ([]digest.Digest, error) {
// delete removes the tag from repository, including the history of all // delete removes the tag from repository, including the history of all
// revisions that have the specified tag. // revisions that have the specified tag.
func (ts *tagStore) delete(name, tag string) error { func (ts *tagStore) delete(tag string) error {
tagPath, err := ts.pathMapper.path(manifestTagPathSpec{ tagPath, err := ts.pm.path(manifestTagPathSpec{
name: name, name: ts.Name(),
tag: tag, tag: tag,
}) })
if err != nil { if err != nil {