Merge pull request #1269 from BrianBland/crossRepositoryPush

Adds cross-repository blob mounting behavior
This commit is contained in:
Richard Scothern 2016-01-08 14:37:00 -08:00
commit 5fe3d68e9c
11 changed files with 441 additions and 13 deletions

View file

@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{
deniedResponseDescriptor, deniedResponseDescriptor,
}, },
}, },
{
Name: "Mount Blob",
Description: "Mount a blob identified by the `mount` parameter from another repository.",
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
contentLengthZeroHeader,
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
},
QueryParameters: []ParameterDescriptor{
{
Name: "mount",
Type: "query",
Format: "<digest>",
Regexp: digest.DigestRegexp,
Description: `Digest of blob to mount from the source repository.`,
},
{
Name: "from",
Type: "query",
Format: "<repository name>",
Regexp: reference.NameRegexp,
Description: `Name of the source repository.`,
},
},
Successes: []ResponseDescriptor{
{
Description: "The blob has been mounted in the repository and is available at the provided location.",
StatusCode: http.StatusCreated,
Headers: []ParameterDescriptor{
{
Name: "Location",
Type: "url",
Format: "<blob location>",
},
contentLengthZeroHeader,
dockerUploadUUIDHeader,
},
},
},
Failures: []ResponseDescriptor{
{
Name: "Invalid Name or Digest",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeDigestInvalid,
ErrorCodeNameInvalid,
},
},
{
Name: "Not allowed",
Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason",
StatusCode: http.StatusMethodNotAllowed,
ErrorCodes: []errcode.ErrorCode{
errcode.ErrorCodeUnsupported,
},
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
},
},
}, },
}, },
}, },

View file

@ -108,6 +108,8 @@ type tokenHandler struct {
tokenLock sync.Mutex tokenLock sync.Mutex
tokenCache string tokenCache string
tokenExpiration time.Time tokenExpiration time.Time
additionalScopes map[string]struct{}
} }
// tokenScope represents the scope at which a token will be requested. // tokenScope represents the scope at which a token will be requested.
@ -145,6 +147,7 @@ func newTokenHandler(transport http.RoundTripper, creds CredentialStore, c clock
Scope: scope, Scope: scope,
Actions: actions, Actions: actions,
}, },
additionalScopes: map[string]struct{}{},
} }
} }
@ -160,7 +163,15 @@ func (th *tokenHandler) Scheme() string {
} }
func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error { func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if err := th.refreshToken(params); err != nil { var additionalScopes []string
if fromParam := req.URL.Query().Get("from"); fromParam != "" {
additionalScopes = append(additionalScopes, tokenScope{
Resource: "repository",
Scope: fromParam,
Actions: []string{"pull"},
}.String())
}
if err := th.refreshToken(params, additionalScopes...); err != nil {
return err return err
} }
@ -169,11 +180,18 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st
return nil return nil
} }
func (th *tokenHandler) refreshToken(params map[string]string) error { func (th *tokenHandler) refreshToken(params map[string]string, additionalScopes ...string) error {
th.tokenLock.Lock() th.tokenLock.Lock()
defer th.tokenLock.Unlock() defer th.tokenLock.Unlock()
var addedScopes bool
for _, scope := range additionalScopes {
if _, ok := th.additionalScopes[scope]; !ok {
th.additionalScopes[scope] = struct{}{}
addedScopes = true
}
}
now := th.clock.Now() now := th.clock.Now()
if now.After(th.tokenExpiration) { if now.After(th.tokenExpiration) || addedScopes {
tr, err := th.fetchToken(params) tr, err := th.fetchToken(params)
if err != nil { if err != nil {
return err return err
@ -223,6 +241,10 @@ func (th *tokenHandler) fetchToken(params map[string]string) (token *tokenRespon
reqParams.Add("scope", scopeField) reqParams.Add("scope", scopeField)
} }
for scope := range th.additionalScopes {
reqParams.Add("scope", scope)
}
if th.creds != nil { if th.creds != nil {
username, password := th.creds.Basic(realmURL) username, password := th.creds.Basic(realmURL)
if username != "" && password != "" { if username != "" && password != "" {

View file

@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/docker/distribution" "github.com/docker/distribution"
@ -499,6 +500,9 @@ type blobs struct {
statter distribution.BlobDescriptorService statter distribution.BlobDescriptorService
distribution.BlobDeleter distribution.BlobDeleter
cacheLock sync.Mutex
cachedBlobUpload distribution.BlobWriter
} }
func sanitizeLocation(location, base string) (string, error) { func sanitizeLocation(location, base string) (string, error) {
@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
} }
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) { func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
bs.cacheLock.Lock()
if bs.cachedBlobUpload != nil {
upload := bs.cachedBlobUpload
bs.cachedBlobUpload = nil
bs.cacheLock.Unlock()
return upload, nil
}
bs.cacheLock.Unlock()
u, err := bs.ub.BuildBlobUploadURL(bs.name) u, err := bs.ub.BuildBlobUploadURL(bs.name)
if err != nil {
return nil, err
}
resp, err := bs.client.Post(u, "", nil) resp, err := bs.client.Post(u, "", nil)
if err != nil { if err != nil {
@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented") panic("not implemented")
} }
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
return bs.Stat(ctx, dgst)
case http.StatusAccepted:
// Triggered a blob upload (legacy behavior), so cache the creation info
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return distribution.Descriptor{}, err
}
bs.cacheLock.Lock()
bs.cachedBlobUpload = &httpBlobUpload{
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}
bs.cacheLock.Unlock()
return distribution.Descriptor{}, HandleErrorResponse(resp)
default:
return distribution.Descriptor{}, HandleErrorResponse(resp)
}
}
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error { func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst) return bs.statter.Clear(ctx, dgst)
} }

View file

@ -466,6 +466,61 @@ func TestBlobUploadMonolithic(t *testing.T) {
} }
} }
func TestBlobMount(t *testing.T) {
dgst, content := newRandomBlob(1024)
var m testutil.RequestResponseMap
repo := "test.example.com/uploadrepo"
sourceRepo := "test.example.com/sourcerepo"
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
Route: "/v2/" + repo + "/blobs/uploads/",
QueryParams: map[string][]string{"from": {sourceRepo}, "mount": {dgst.String()}},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo + "/blobs/" + dgst.String()},
"Docker-Content-Digest": {dgst.String()},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
stat, err := l.Mount(ctx, sourceRepo, dgst)
if err != nil {
t.Fatal(err)
}
if stat.Digest != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst)
}
}
func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*schema1.SignedManifest, digest.Digest, []byte) { func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*schema1.SignedManifest, digest.Digest, []byte) {
blobs := make([]schema1.FSLayer, blobCount) blobs := make([]schema1.FSLayer, blobCount)
history := make([]schema1.History, blobCount) history := make([]schema1.History, blobCount)

View file

@ -710,6 +710,11 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
if repo != "" { if repo != "" {
accessRecords = appendAccessRecords(accessRecords, r.Method, repo) accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
if fromRepo := r.FormValue("from"); fromRepo != "" {
// mounting a blob from one repository to another requires pull (GET)
// access to the source repository.
accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo)
}
} else { } else {
// Only allow the name not to be set on the base route. // Only allow the name not to be set on the base route.
if app.nameRequired(r) { if app.nameRequired(r) {

View file

@ -116,8 +116,16 @@ type blobUploadHandler struct {
} }
// StartBlobUpload begins the blob upload process and allocates a server-side // StartBlobUpload begins the blob upload process and allocates a server-side
// blob writer session. // blob writer session, optionally mounting the blob from a separate repository.
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) { func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")
if mountDigest != "" && fromRepo != "" {
buh.mountBlob(w, fromRepo, mountDigest)
return
}
blobs := buh.Repository.Blobs(buh) blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh) upload, err := blobs.Create(buh)
@ -254,18 +262,10 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
return return
} }
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
// Build our canonical blob url
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
if err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err)) buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return return
} }
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
} }
// CancelBlobUpload cancels an in-progress upload of a blob. // CancelBlobUpload cancels an in-progress upload of a blob.
@ -335,3 +335,45 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
return nil return nil
} }
// mountBlob attempts to mount a blob from another repository by its digest. If
// successful, the blob is linked into the blob store and 201 Created is
// returned with the canonical url of the blob.
func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) {
dgst, err := digest.ParseDigest(mountDigest)
if err != nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
return
}
blobs := buh.Repository.Blobs(buh)
desc, err := blobs.Mount(buh, fromRepo, dgst)
if err != nil {
if err == distribution.ErrBlobUnknown {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst))
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// writeBlobCreatedHeaders writes the standard headers describing a newly
// created blob. A 201 Created is written as well as the canonical URL and
// blob digest.
func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc distribution.Descriptor) error {
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
if err != nil {
return err
}
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
return nil
}

View file

@ -169,6 +169,10 @@ func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.
return nil, distribution.ErrUnsupported return nil, distribution.ErrUnsupported
} }
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported return nil, distribution.ErrUnsupported
} }

View file

@ -58,6 +58,14 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B
return sbs.blobs.Resume(ctx, id) return sbs.blobs.Resume(ctx, id)
} }
func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
sbsMu.Lock()
sbs.stats["mount"]++
sbsMu.Unlock()
return sbs.blobs.Mount(ctx, sourceRepo, dgst)
}
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
sbsMu.Lock() sbsMu.Lock()
sbs.stats["open"]++ sbs.stats["open"]++

View file

@ -310,6 +310,154 @@ func TestSimpleBlobRead(t *testing.T) {
} }
} }
// TestBlobMount covers the blob mount process, exercising common
// error paths that might be seen during a mount.
func TestBlobMount(t *testing.T) {
randomDataReader, dgst, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
ctx := context.Background()
imageName := "foo/bar"
sourceImageName := "foo/source"
driver := inmemory.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sourceRepository, err := registry.Repository(ctx, sourceImageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sbs := sourceRepository.Blobs(ctx)
blobUpload, err := sbs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
nn, err := io.Copy(blobUpload, randomDataReader)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// Test for existence.
statDesc, err := sbs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs)
}
if statDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
bs := repository.Blobs(ctx)
// Test destination for existence.
statDesc, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
}
mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest)
if err != nil {
t.Fatalf("unexpected error mounting layer: %v", err)
}
if mountDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc)
}
// Test for existence.
statDesc, err = bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs)
}
if statDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
rc, err := bs.Open(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error opening blob for read: %v", err)
}
defer rc.Close()
h := sha256.New()
nn, err = io.Copy(h, rc)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if digest.NewDigest("sha256", h) != dgst {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst)
}
// Delete the blob from the source repo
err = sbs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err := bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error stating blob deleted from source repository: %v", err)
}
d, err = sbs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
// Delete the blob from the dest repo
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
}
// TestLayerUploadZeroLength uploads zero-length // TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) { func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background() ctx := context.Background()

View file

@ -20,6 +20,7 @@ type linkPathFunc func(name string, dgst digest.Digest) (string, error)
// that grant access to the global blob store. // that grant access to the global blob store.
type linkedBlobStore struct { type linkedBlobStore struct {
*blobStore *blobStore
registry *registry
blobServer distribution.BlobServer blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService blobAccessController distribution.BlobDescriptorService
repository distribution.Repository repository distribution.Repository
@ -185,6 +186,28 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
return nil return nil
} }
func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {
return distribution.Descriptor{}, err
}
stat, err := repo.Blobs(ctx).Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
desc := distribution.Descriptor{
Size: stat.Size,
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}
return desc, lbs.linkBlob(ctx, desc)
}
// newBlobUpload allocates a new upload controller with the given state. // newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
fw, err := newFileWriter(ctx, lbs.driver, path) fw, err := newFileWriter(ctx, lbs.driver, path)

View file

@ -233,6 +233,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
} }
return &linkedBlobStore{ return &linkedBlobStore{
registry: repo.registry,
blobStore: repo.blobStore, blobStore: repo.blobStore,
blobServer: repo.blobServer, blobServer: repo.blobServer,
blobAccessController: statter, blobAccessController: statter,