Adds cross-repository blob mounting behavior
Extends blob upload POST endpoint to support mount and from query parameters as described in #634 Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
parent
c426367881
commit
41e30f626b
10 changed files with 416 additions and 10 deletions
|
@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{
|
|||
deniedResponseDescriptor,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Mount Blob",
|
||||
Description: "Mount a blob identified by the `mount` parameter from another repository.",
|
||||
Headers: []ParameterDescriptor{
|
||||
hostHeader,
|
||||
authHeader,
|
||||
contentLengthZeroHeader,
|
||||
},
|
||||
PathParameters: []ParameterDescriptor{
|
||||
nameParameterDescriptor,
|
||||
},
|
||||
QueryParameters: []ParameterDescriptor{
|
||||
{
|
||||
Name: "mount",
|
||||
Type: "query",
|
||||
Format: "<digest>",
|
||||
Regexp: digest.DigestRegexp,
|
||||
Description: `Digest of blob to mount from the source repository.`,
|
||||
},
|
||||
{
|
||||
Name: "from",
|
||||
Type: "query",
|
||||
Format: "<repository name>",
|
||||
Regexp: reference.NameRegexp,
|
||||
Description: `Name of the source repository.`,
|
||||
},
|
||||
},
|
||||
Successes: []ResponseDescriptor{
|
||||
{
|
||||
Description: "The blob has been mounted in the repository and is available at the provided location.",
|
||||
StatusCode: http.StatusCreated,
|
||||
Headers: []ParameterDescriptor{
|
||||
{
|
||||
Name: "Location",
|
||||
Type: "url",
|
||||
Format: "<blob location>",
|
||||
},
|
||||
contentLengthZeroHeader,
|
||||
dockerUploadUUIDHeader,
|
||||
},
|
||||
},
|
||||
},
|
||||
Failures: []ResponseDescriptor{
|
||||
{
|
||||
Name: "Invalid Name or Digest",
|
||||
StatusCode: http.StatusBadRequest,
|
||||
ErrorCodes: []errcode.ErrorCode{
|
||||
ErrorCodeDigestInvalid,
|
||||
ErrorCodeNameInvalid,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "Not allowed",
|
||||
Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason",
|
||||
StatusCode: http.StatusMethodNotAllowed,
|
||||
ErrorCodes: []errcode.ErrorCode{
|
||||
errcode.ErrorCodeUnsupported,
|
||||
},
|
||||
},
|
||||
unauthorizedResponseDescriptor,
|
||||
repositoryNotFoundResponseDescriptor,
|
||||
deniedResponseDescriptor,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution"
|
||||
|
@ -499,6 +500,9 @@ type blobs struct {
|
|||
|
||||
statter distribution.BlobDescriptorService
|
||||
distribution.BlobDeleter
|
||||
|
||||
cacheLock sync.Mutex
|
||||
cachedBlobUpload distribution.BlobWriter
|
||||
}
|
||||
|
||||
func sanitizeLocation(location, base string) (string, error) {
|
||||
|
@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
|
|||
}
|
||||
|
||||
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
||||
bs.cacheLock.Lock()
|
||||
if bs.cachedBlobUpload != nil {
|
||||
upload := bs.cachedBlobUpload
|
||||
bs.cachedBlobUpload = nil
|
||||
bs.cacheLock.Unlock()
|
||||
|
||||
return upload, nil
|
||||
}
|
||||
bs.cacheLock.Unlock()
|
||||
|
||||
u, err := bs.ub.BuildBlobUploadURL(bs.name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := bs.client.Post(u, "", nil)
|
||||
if err != nil {
|
||||
|
@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
|
|||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
resp, err := bs.client.Post(u, "", nil)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusCreated:
|
||||
return bs.Stat(ctx, dgst)
|
||||
case http.StatusAccepted:
|
||||
// Triggered a blob upload (legacy behavior), so cache the creation info
|
||||
uuid := resp.Header.Get("Docker-Upload-UUID")
|
||||
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
bs.cacheLock.Lock()
|
||||
bs.cachedBlobUpload = &httpBlobUpload{
|
||||
statter: bs.statter,
|
||||
client: bs.client,
|
||||
uuid: uuid,
|
||||
startedAt: time.Now(),
|
||||
location: location,
|
||||
}
|
||||
bs.cacheLock.Unlock()
|
||||
|
||||
return distribution.Descriptor{}, HandleErrorResponse(resp)
|
||||
default:
|
||||
return distribution.Descriptor{}, HandleErrorResponse(resp)
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return bs.statter.Clear(ctx, dgst)
|
||||
}
|
||||
|
|
|
@ -466,6 +466,61 @@ func TestBlobUploadMonolithic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestBlobMount(t *testing.T) {
|
||||
dgst, content := newRandomBlob(1024)
|
||||
var m testutil.RequestResponseMap
|
||||
repo := "test.example.com/uploadrepo"
|
||||
sourceRepo := "test.example.com/sourcerepo"
|
||||
m = append(m, testutil.RequestResponseMapping{
|
||||
Request: testutil.Request{
|
||||
Method: "POST",
|
||||
Route: "/v2/" + repo + "/blobs/uploads/",
|
||||
QueryParams: map[string][]string{"from": {sourceRepo}, "mount": {dgst.String()}},
|
||||
},
|
||||
Response: testutil.Response{
|
||||
StatusCode: http.StatusCreated,
|
||||
Headers: http.Header(map[string][]string{
|
||||
"Content-Length": {"0"},
|
||||
"Location": {"/v2/" + repo + "/blobs/" + dgst.String()},
|
||||
"Docker-Content-Digest": {dgst.String()},
|
||||
}),
|
||||
},
|
||||
})
|
||||
m = append(m, testutil.RequestResponseMapping{
|
||||
Request: testutil.Request{
|
||||
Method: "HEAD",
|
||||
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
|
||||
},
|
||||
Response: testutil.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Headers: http.Header(map[string][]string{
|
||||
"Content-Length": {fmt.Sprint(len(content))},
|
||||
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
e, c := testServer(m)
|
||||
defer c()
|
||||
|
||||
ctx := context.Background()
|
||||
r, err := NewRepository(ctx, repo, e, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
l := r.Blobs(ctx)
|
||||
|
||||
stat, err := l.Mount(ctx, sourceRepo, dgst)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if stat.Digest != dgst {
|
||||
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst)
|
||||
}
|
||||
}
|
||||
|
||||
func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*schema1.SignedManifest, digest.Digest, []byte) {
|
||||
blobs := make([]schema1.FSLayer, blobCount)
|
||||
history := make([]schema1.History, blobCount)
|
||||
|
|
|
@ -710,6 +710,11 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
|
|||
|
||||
if repo != "" {
|
||||
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
|
||||
if fromRepo := r.FormValue("from"); fromRepo != "" {
|
||||
// mounting a blob from one repository to another requires pull (GET)
|
||||
// access to the source repository.
|
||||
accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo)
|
||||
}
|
||||
} else {
|
||||
// Only allow the name not to be set on the base route.
|
||||
if app.nameRequired(r) {
|
||||
|
|
|
@ -116,8 +116,16 @@ type blobUploadHandler struct {
|
|||
}
|
||||
|
||||
// StartBlobUpload begins the blob upload process and allocates a server-side
|
||||
// blob writer session.
|
||||
// blob writer session, optionally mounting the blob from a separate repository.
|
||||
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
|
||||
fromRepo := r.FormValue("from")
|
||||
mountDigest := r.FormValue("mount")
|
||||
|
||||
if mountDigest != "" && fromRepo != "" {
|
||||
buh.mountBlob(w, fromRepo, mountDigest)
|
||||
return
|
||||
}
|
||||
|
||||
blobs := buh.Repository.Blobs(buh)
|
||||
upload, err := blobs.Create(buh)
|
||||
|
||||
|
@ -254,18 +262,10 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
// Build our canonical blob url
|
||||
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
|
||||
if err != nil {
|
||||
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
|
||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Location", blobURL)
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
// CancelBlobUpload cancels an in-progress upload of a blob.
|
||||
|
@ -335,3 +335,45 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// mountBlob attempts to mount a blob from another repository by its digest. If
|
||||
// successful, the blob is linked into the blob store and 201 Created is
|
||||
// returned with the canonical url of the blob.
|
||||
func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) {
|
||||
dgst, err := digest.ParseDigest(mountDigest)
|
||||
if err != nil {
|
||||
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
|
||||
return
|
||||
}
|
||||
|
||||
blobs := buh.Repository.Blobs(buh)
|
||||
desc, err := blobs.Mount(buh, fromRepo, dgst)
|
||||
if err != nil {
|
||||
if err == distribution.ErrBlobUnknown {
|
||||
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst))
|
||||
} else {
|
||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
|
||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// writeBlobCreatedHeaders writes the standard headers describing a newly
|
||||
// created blob. A 201 Created is written as well as the canonical URL and
|
||||
// blob digest.
|
||||
func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc distribution.Descriptor) error {
|
||||
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Header().Set("Location", blobURL)
|
||||
w.Header().Set("Content-Length", "0")
|
||||
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -169,6 +169,10 @@ func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.
|
|||
return nil, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
return distribution.Descriptor{}, distribution.ErrUnsupported
|
||||
}
|
||||
|
||||
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||
return nil, distribution.ErrUnsupported
|
||||
}
|
||||
|
|
|
@ -58,6 +58,14 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B
|
|||
return sbs.blobs.Resume(ctx, id)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["mount"]++
|
||||
sbsMu.Unlock()
|
||||
|
||||
return sbs.blobs.Mount(ctx, sourceRepo, dgst)
|
||||
}
|
||||
|
||||
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||
sbsMu.Lock()
|
||||
sbs.stats["open"]++
|
||||
|
|
|
@ -310,6 +310,154 @@ func TestSimpleBlobRead(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestBlobMount covers the blob mount process, exercising common
|
||||
// error paths that might be seen during a mount.
|
||||
func TestBlobMount(t *testing.T) {
|
||||
randomDataReader, dgst, err := testutil.CreateRandomTarFile()
|
||||
if err != nil {
|
||||
t.Fatalf("error creating random reader: %v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
imageName := "foo/bar"
|
||||
sourceImageName := "foo/source"
|
||||
driver := inmemory.New()
|
||||
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating registry: %v", err)
|
||||
}
|
||||
|
||||
repository, err := registry.Repository(ctx, imageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
sourceRepository, err := registry.Repository(ctx, sourceImageName)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error getting repo: %v", err)
|
||||
}
|
||||
|
||||
sbs := sourceRepository.Blobs(ctx)
|
||||
|
||||
blobUpload, err := sbs.Create(ctx)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error starting layer upload: %s", err)
|
||||
}
|
||||
|
||||
// Get the size of our random tarfile
|
||||
randomDataSize, err := seekerSize(randomDataReader)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting seeker size of random data: %v", err)
|
||||
}
|
||||
|
||||
nn, err := io.Copy(blobUpload, randomDataReader)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error uploading layer data: %v", err)
|
||||
}
|
||||
|
||||
desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error finishing layer upload: %v", err)
|
||||
}
|
||||
|
||||
// Test for existence.
|
||||
statDesc, err := sbs.Stat(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs)
|
||||
}
|
||||
|
||||
if statDesc != desc {
|
||||
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
|
||||
}
|
||||
|
||||
bs := repository.Blobs(ctx)
|
||||
// Test destination for existence.
|
||||
statDesc, err = bs.Stat(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
|
||||
}
|
||||
|
||||
mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error mounting layer: %v", err)
|
||||
}
|
||||
|
||||
if mountDesc != desc {
|
||||
t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc)
|
||||
}
|
||||
|
||||
// Test for existence.
|
||||
statDesc, err = bs.Stat(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs)
|
||||
}
|
||||
|
||||
if statDesc != desc {
|
||||
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
|
||||
}
|
||||
|
||||
rc, err := bs.Open(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error opening blob for read: %v", err)
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
h := sha256.New()
|
||||
nn, err = io.Copy(h, rc)
|
||||
if err != nil {
|
||||
t.Fatalf("error reading layer: %v", err)
|
||||
}
|
||||
|
||||
if nn != randomDataSize {
|
||||
t.Fatalf("incorrect read length")
|
||||
}
|
||||
|
||||
if digest.NewDigest("sha256", h) != dgst {
|
||||
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst)
|
||||
}
|
||||
|
||||
// Delete the blob from the source repo
|
||||
err = sbs.Delete(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error deleting blob")
|
||||
}
|
||||
|
||||
d, err := bs.Stat(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error stating blob deleted from source repository: %v", err)
|
||||
}
|
||||
|
||||
d, err = sbs.Stat(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case distribution.ErrBlobUnknown:
|
||||
break
|
||||
default:
|
||||
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
|
||||
}
|
||||
|
||||
// Delete the blob from the dest repo
|
||||
err = bs.Delete(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error deleting blob")
|
||||
}
|
||||
|
||||
d, err = bs.Stat(ctx, desc.Digest)
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
|
||||
}
|
||||
|
||||
switch err {
|
||||
case distribution.ErrBlobUnknown:
|
||||
break
|
||||
default:
|
||||
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLayerUploadZeroLength uploads zero-length
|
||||
func TestLayerUploadZeroLength(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
|
|
@ -20,6 +20,7 @@ type linkPathFunc func(name string, dgst digest.Digest) (string, error)
|
|||
// that grant access to the global blob store.
|
||||
type linkedBlobStore struct {
|
||||
*blobStore
|
||||
registry *registry
|
||||
blobServer distribution.BlobServer
|
||||
blobAccessController distribution.BlobDescriptorService
|
||||
repository distribution.Repository
|
||||
|
@ -185,6 +186,28 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
repo, err := lbs.registry.Repository(ctx, sourceRepo)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
stat, err := repo.Blobs(ctx).Stat(ctx, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
desc := distribution.Descriptor{
|
||||
Size: stat.Size,
|
||||
|
||||
// NOTE(stevvooe): The central blob store firewalls media types from
|
||||
// other users. The caller should look this up and override the value
|
||||
// for the specific repository.
|
||||
MediaType: "application/octet-stream",
|
||||
Digest: dgst,
|
||||
}
|
||||
return desc, lbs.linkBlob(ctx, desc)
|
||||
}
|
||||
|
||||
// newBlobUpload allocates a new upload controller with the given state.
|
||||
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
|
||||
fw, err := newFileWriter(ctx, lbs.driver, path)
|
||||
|
|
|
@ -233,6 +233,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
|
|||
}
|
||||
|
||||
return &linkedBlobStore{
|
||||
registry: repo.registry,
|
||||
blobStore: repo.blobStore,
|
||||
blobServer: repo.blobServer,
|
||||
blobAccessController: statter,
|
||||
|
|
Loading…
Reference in a new issue