Merge pull request #1269 from BrianBland/crossRepositoryPush

Adds cross-repository blob mounting behavior
This commit is contained in:
Richard Scothern 2016-01-08 14:37:00 -08:00
commit 93d9070c8b
20 changed files with 744 additions and 19 deletions

View file

@ -155,6 +155,10 @@ type BlobIngester interface {
// Resume attempts to resume a write to a blob, identified by an id.
Resume(ctx context.Context, id string) (BlobWriter, error)
// Mount adds a blob to this service from another source repository,
// identified by a digest.
Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error)
}
// BlobWriter provides a handle for inserting data into a blob store.

View file

@ -707,6 +707,34 @@ registry server will dump all intermediate data. While uploads will time out
if not completed, clients should issue this request if they encounter a fatal
error but still have the ability to issue an http request.
##### Cross Repository Blob Mount
A blob may be mounted from another repository that the client has read access
to, removing the need to upload a blob already known to the registry. To issue
a blob mount instead of an upload, a POST request should be issued in the
following format:
```
POST /v2/<name>/blobs/uploads/?mount=<digest>&from=<repository name>
Content-Length: 0
```
If the blob is successfully mounted, the client will receive a `201 Created`
response:
```
201 Created
Location: /v2/<name>/blobs/<digest>
Content-Length: 0
Docker-Content-Digest: <digest>
```
The `Location` header will contain the registry URL to access the accepted
layer file. The `Docker-Content-Digest` header returns the canonical digest of
the uploaded blob which may differ from the provided digest. Most clients may
ignore the value but if it is used, the client should verify the value against
the uploaded blob data.
##### Errors
If an 502, 503 or 504 error is received, the client should assume that the
@ -1023,7 +1051,7 @@ A list of methods and URIs are covered in the table below:
|------|----|------|-----------|
| GET | `/v2/` | Base | Check that the endpoint implements Docker Registry API V2. |
| GET | `/v2/<name>/tags/list` | Tags | Fetch the tags under the repository identified by `name`. |
| GET | `/v2/<name>/manifests/<reference>` | Manifest | Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. |
| GET | `/v2/<name>/manifests/<reference>` | Manifest | Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data. |
| PUT | `/v2/<name>/manifests/<reference>` | Manifest | Put the manifest identified by `name` and `reference` where `reference` can be a tag or digest. |
| DELETE | `/v2/<name>/manifests/<reference>` | Manifest | Delete the manifest identified by `name` and `reference`. Note that a manifest can _only_ be deleted by `digest`. |
| GET | `/v2/<name>/blobs/<digest>` | Blob | Retrieve the blob from the registry identified by `digest`. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data. |
@ -1500,7 +1528,7 @@ Create, update, delete and retrieve manifests.
#### GET Manifest
Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest.
Fetch the manifest identified by `name` and `reference` where `reference` can be a tag or digest. A `HEAD` request can also be issued to this endpoint to obtain resource information without receiving all data.
@ -3313,6 +3341,204 @@ The error codes that may be included in the response body are enumerated below:
##### Mount Blob
```
POST /v2/<name>/blobs/uploads/?mount=<digest>&from=<repository name>
Host: <registry host>
Authorization: <scheme> <token>
Content-Length: 0
```
Mount a blob identified by the `mount` parameter from another repository.
The following parameters should be specified on the request:
|Name|Kind|Description|
|----|----|-----------|
|`Host`|header|Standard HTTP Host Header. Should be set to the registry host.|
|`Authorization`|header|An RFC7235 compliant authorization header.|
|`Content-Length`|header|The `Content-Length` header must be zero and the body must be empty.|
|`name`|path|Name of the target repository.|
|`mount`|query|Digest of blob to mount from the source repository.|
|`from`|query|Name of the source repository.|
###### On Success: Created
```
201 Created
Location: <blob location>
Content-Length: 0
Docker-Upload-UUID: <uuid>
```
The blob has been mounted in the repository and is available at the provided location.
The following headers will be returned with the response:
|Name|Description|
|----|-----------|
|`Location`||
|`Content-Length`|The `Content-Length` header must be zero and the body must be empty.|
|`Docker-Upload-UUID`|Identifies the docker upload uuid for the current request.|
###### On Failure: Invalid Name or Digest
```
400 Bad Request
```
The error codes that may be included in the response body are enumerated below:
|Code|Message|Description|
|----|-------|-----------|
| `DIGEST_INVALID` | provided digest did not match uploaded content | When a blob is uploaded, the registry will check that the content matches the digest provided by the client. The error may include a detail structure with the key "digest", including the invalid digest string. This error may also be returned when a manifest includes an invalid layer digest. |
| `NAME_INVALID` | invalid repository name | Invalid repository name encountered either during manifest validation or any API operation. |
###### On Failure: Not allowed
```
405 Method Not Allowed
```
Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason
The error codes that may be included in the response body are enumerated below:
|Code|Message|Description|
|----|-------|-----------|
| `UNSUPPORTED` | The operation is unsupported. | The operation was unsupported due to a missing implementation or invalid set of parameters. |
###### On Failure: Authentication Required
```
401 Unauthorized
WWW-Authenticate: <scheme> realm="<realm>", ..."
Content-Length: <length>
Content-Type: application/json; charset=utf-8
{
"errors:" [
{
"code": <error code>,
"message": "<error message>",
"detail": ...
},
...
]
}
```
The client is not authenticated.
The following headers will be returned on the response:
|Name|Description|
|----|-----------|
|`WWW-Authenticate`|An RFC7235 compliant authentication challenge header.|
|`Content-Length`|Length of the JSON response body.|
The error codes that may be included in the response body are enumerated below:
|Code|Message|Description|
|----|-------|-----------|
| `UNAUTHORIZED` | authentication required | The access controller was unable to authenticate the client. Often this will be accompanied by a Www-Authenticate HTTP response header indicating how to authenticate. |
###### On Failure: No Such Repository Error
```
404 Not Found
Content-Length: <length>
Content-Type: application/json; charset=utf-8
{
"errors:" [
{
"code": <error code>,
"message": "<error message>",
"detail": ...
},
...
]
}
```
The repository is not known to the registry.
The following headers will be returned on the response:
|Name|Description|
|----|-----------|
|`Content-Length`|Length of the JSON response body.|
The error codes that may be included in the response body are enumerated below:
|Code|Message|Description|
|----|-------|-----------|
| `NAME_UNKNOWN` | repository name not known to registry | This is returned if the name used during an operation is unknown to the registry. |
###### On Failure: Access Denied
```
403 Forbidden
Content-Length: <length>
Content-Type: application/json; charset=utf-8
{
"errors:" [
{
"code": <error code>,
"message": "<error message>",
"detail": ...
},
...
]
}
```
The client does not have required access to the repository.
The following headers will be returned on the response:
|Name|Description|
|----|-----------|
|`Content-Length`|Length of the JSON response body.|
The error codes that may be included in the response body are enumerated below:
|Code|Message|Description|
|----|-------|-----------|
| `DENIED` | requested access to the resource is denied | The access controller denied access for the operation on a resource. |
### Blob Upload

View file

@ -707,6 +707,34 @@ registry server will dump all intermediate data. While uploads will time out
if not completed, clients should issue this request if they encounter a fatal
error but still have the ability to issue an http request.
##### Cross Repository Blob Mount
A blob may be mounted from another repository that the client has read access
to, removing the need to upload a blob already known to the registry. To issue
a blob mount instead of an upload, a POST request should be issued in the
following format:
```
POST /v2/<name>/blobs/uploads/?mount=<digest>&from=<repository name>
Content-Length: 0
```
If the blob is successfully mounted, the client will receive a `201 Created`
response:
```
201 Created
Location: /v2/<name>/blobs/<digest>
Content-Length: 0
Docker-Content-Digest: <digest>
```
The `Location` header will contain the registry URL to access the accepted
layer file. The `Docker-Content-Digest` header returns the canonical digest of
the uploaded blob which may differ from the provided digest. Most clients may
ignore the value but if it is used, the client should verify the value against
the uploaded blob data.
##### Errors
If an 502, 503 or 504 error is received, the client should assume that the

View file

@ -50,6 +50,10 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
panic("not implemented")
}
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
panic("not implemented")
}
func TestEmptyTar(t *testing.T) {
// Confirm that gzippedEmptyTar expands to 1024 NULL bytes.
var decompressed [2048]byte

View file

@ -46,6 +46,10 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
panic("not implemented")
}
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
panic("not implemented")
}
func TestBuilder(t *testing.T) {
imgJSON := []byte(`{
"architecture": "amd64",

View file

@ -72,6 +72,15 @@ func (b *bridge) BlobPulled(repo string, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionPull, repo, desc)
}
func (b *bridge) BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error {
event, err := b.createBlobEvent(EventActionMount, repo, desc)
if err != nil {
return err
}
event.Target.FromRepository = fromRepo
return b.sink.Write(*event)
}
func (b *bridge) BlobDeleted(repo string, desc distribution.Descriptor) error {
return b.createBlobEventAndWrite(EventActionDelete, repo, desc)
}

View file

@ -11,6 +11,7 @@ import (
const (
EventActionPull = "pull"
EventActionPush = "push"
EventActionMount = "mount"
EventActionDelete = "delete"
)
@ -61,6 +62,10 @@ type Event struct {
// Repository identifies the named repository.
Repository string `json:"repository,omitempty"`
// FromRepository identifies the named repository which a blob was mounted
// from if appropriate.
FromRepository string `json:"fromRepository,omitempty"`
// URL provides a direct link to the content.
URL string `json:"url,omitempty"`
} `json:"target,omitempty"`

View file

@ -24,6 +24,7 @@ type ManifestListener interface {
type BlobListener interface {
BlobPushed(repo string, desc distribution.Descriptor) error
BlobPulled(repo string, desc distribution.Descriptor) error
BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error
// TODO(stevvooe): Please note that delete support is still a little shaky
// and we'll need to propagate these in the future.
@ -169,6 +170,17 @@ func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribu
return bsl.decorateWriter(wr), err
}
func (bsl *blobServiceListener) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
desc, err := bsl.BlobStore.Mount(ctx, sourceRepo, dgst)
if err == nil {
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), desc, sourceRepo); err != nil {
context.GetLogger(ctx).Errorf("error dispatching layer mount to listener: %v", err)
}
}
return desc, err
}
func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
return &blobWriterListener{
BlobWriter: wr,

View file

@ -81,6 +81,11 @@ func (tl *testListener) BlobPulled(repo string, desc distribution.Descriptor) er
return nil
}
func (tl *testListener) BlobMounted(repo string, desc distribution.Descriptor, fromRepo string) error {
tl.ops["layer:mount"]++
return nil
}
func (tl *testListener) BlobDeleted(repo string, desc distribution.Descriptor) error {
tl.ops["layer:delete"]++
return nil

View file

@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{
deniedResponseDescriptor,
},
},
{
Name: "Mount Blob",
Description: "Mount a blob identified by the `mount` parameter from another repository.",
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
contentLengthZeroHeader,
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
},
QueryParameters: []ParameterDescriptor{
{
Name: "mount",
Type: "query",
Format: "<digest>",
Regexp: digest.DigestRegexp,
Description: `Digest of blob to mount from the source repository.`,
},
{
Name: "from",
Type: "query",
Format: "<repository name>",
Regexp: reference.NameRegexp,
Description: `Name of the source repository.`,
},
},
Successes: []ResponseDescriptor{
{
Description: "The blob has been mounted in the repository and is available at the provided location.",
StatusCode: http.StatusCreated,
Headers: []ParameterDescriptor{
{
Name: "Location",
Type: "url",
Format: "<blob location>",
},
contentLengthZeroHeader,
dockerUploadUUIDHeader,
},
},
},
Failures: []ResponseDescriptor{
{
Name: "Invalid Name or Digest",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeDigestInvalid,
ErrorCodeNameInvalid,
},
},
{
Name: "Not allowed",
Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason",
StatusCode: http.StatusMethodNotAllowed,
ErrorCodes: []errcode.ErrorCode{
errcode.ErrorCodeUnsupported,
},
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
},
},
},
},
},

View file

@ -108,6 +108,8 @@ type tokenHandler struct {
tokenLock sync.Mutex
tokenCache string
tokenExpiration time.Time
additionalScopes map[string]struct{}
}
// tokenScope represents the scope at which a token will be requested.
@ -145,6 +147,7 @@ func newTokenHandler(transport http.RoundTripper, creds CredentialStore, c clock
Scope: scope,
Actions: actions,
},
additionalScopes: map[string]struct{}{},
}
}
@ -160,7 +163,15 @@ func (th *tokenHandler) Scheme() string {
}
func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if err := th.refreshToken(params); err != nil {
var additionalScopes []string
if fromParam := req.URL.Query().Get("from"); fromParam != "" {
additionalScopes = append(additionalScopes, tokenScope{
Resource: "repository",
Scope: fromParam,
Actions: []string{"pull"},
}.String())
}
if err := th.refreshToken(params, additionalScopes...); err != nil {
return err
}
@ -169,11 +180,18 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st
return nil
}
func (th *tokenHandler) refreshToken(params map[string]string) error {
func (th *tokenHandler) refreshToken(params map[string]string, additionalScopes ...string) error {
th.tokenLock.Lock()
defer th.tokenLock.Unlock()
var addedScopes bool
for _, scope := range additionalScopes {
if _, ok := th.additionalScopes[scope]; !ok {
th.additionalScopes[scope] = struct{}{}
addedScopes = true
}
}
now := th.clock.Now()
if now.After(th.tokenExpiration) {
if now.After(th.tokenExpiration) || addedScopes {
tr, err := th.fetchToken(params)
if err != nil {
return err
@ -223,6 +241,10 @@ func (th *tokenHandler) fetchToken(params map[string]string) (token *tokenRespon
reqParams.Add("scope", scopeField)
}
for scope := range th.additionalScopes {
reqParams.Add("scope", scope)
}
if th.creds != nil {
username, password := th.creds.Basic(realmURL)
if username != "" && password != "" {

View file

@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
@ -499,6 +500,9 @@ type blobs struct {
statter distribution.BlobDescriptorService
distribution.BlobDeleter
cacheLock sync.Mutex
cachedBlobUpload distribution.BlobWriter
}
func sanitizeLocation(location, base string) (string, error) {
@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
}
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
bs.cacheLock.Lock()
if bs.cachedBlobUpload != nil {
upload := bs.cachedBlobUpload
bs.cachedBlobUpload = nil
bs.cacheLock.Unlock()
return upload, nil
}
bs.cacheLock.Unlock()
u, err := bs.ub.BuildBlobUploadURL(bs.name)
if err != nil {
return nil, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented")
}
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
return bs.Stat(ctx, dgst)
case http.StatusAccepted:
// Triggered a blob upload (legacy behavior), so cache the creation info
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return distribution.Descriptor{}, err
}
bs.cacheLock.Lock()
bs.cachedBlobUpload = &httpBlobUpload{
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}
bs.cacheLock.Unlock()
return distribution.Descriptor{}, HandleErrorResponse(resp)
default:
return distribution.Descriptor{}, HandleErrorResponse(resp)
}
}
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}

View file

@ -466,6 +466,61 @@ func TestBlobUploadMonolithic(t *testing.T) {
}
}
func TestBlobMount(t *testing.T) {
dgst, content := newRandomBlob(1024)
var m testutil.RequestResponseMap
repo := "test.example.com/uploadrepo"
sourceRepo := "test.example.com/sourcerepo"
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "POST",
Route: "/v2/" + repo + "/blobs/uploads/",
QueryParams: map[string][]string{"from": {sourceRepo}, "mount": {dgst.String()}},
},
Response: testutil.Response{
StatusCode: http.StatusCreated,
Headers: http.Header(map[string][]string{
"Content-Length": {"0"},
"Location": {"/v2/" + repo + "/blobs/" + dgst.String()},
"Docker-Content-Digest": {dgst.String()},
}),
},
})
m = append(m, testutil.RequestResponseMapping{
Request: testutil.Request{
Method: "HEAD",
Route: "/v2/" + repo + "/blobs/" + dgst.String(),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Content-Length": {fmt.Sprint(len(content))},
"Last-Modified": {time.Now().Add(-1 * time.Second).Format(time.ANSIC)},
}),
},
})
e, c := testServer(m)
defer c()
ctx := context.Background()
r, err := NewRepository(ctx, repo, e, nil)
if err != nil {
t.Fatal(err)
}
l := r.Blobs(ctx)
stat, err := l.Mount(ctx, sourceRepo, dgst)
if err != nil {
t.Fatal(err)
}
if stat.Digest != dgst {
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst)
}
}
func newRandomSchemaV1Manifest(name, tag string, blobCount int) (*schema1.SignedManifest, digest.Digest, []byte) {
blobs := make([]schema1.FSLayer, blobCount)
history := make([]schema1.History, blobCount)

View file

@ -710,6 +710,11 @@ func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Cont
if repo != "" {
accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
if fromRepo := r.FormValue("from"); fromRepo != "" {
// mounting a blob from one repository to another requires pull (GET)
// access to the source repository.
accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo)
}
} else {
// Only allow the name not to be set on the base route.
if app.nameRequired(r) {

View file

@ -116,8 +116,16 @@ type blobUploadHandler struct {
}
// StartBlobUpload begins the blob upload process and allocates a server-side
// blob writer session.
// blob writer session, optionally mounting the blob from a separate repository.
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
fromRepo := r.FormValue("from")
mountDigest := r.FormValue("mount")
if mountDigest != "" && fromRepo != "" {
buh.mountBlob(w, fromRepo, mountDigest)
return
}
blobs := buh.Repository.Blobs(buh)
upload, err := blobs.Create(buh)
@ -254,18 +262,10 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht
return
}
// Build our canonical blob url
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
if err != nil {
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
}
// CancelBlobUpload cancels an in-progress upload of a blob.
@ -335,3 +335,45 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
return nil
}
// mountBlob attempts to mount a blob from another repository by its digest. If
// successful, the blob is linked into the blob store and 201 Created is
// returned with the canonical url of the blob.
func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) {
dgst, err := digest.ParseDigest(mountDigest)
if err != nil {
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
return
}
blobs := buh.Repository.Blobs(buh)
desc, err := blobs.Mount(buh, fromRepo, dgst)
if err != nil {
if err == distribution.ErrBlobUnknown {
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst))
} else {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
}
return
}
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
return
}
}
// writeBlobCreatedHeaders writes the standard headers describing a newly
// created blob. A 201 Created is written as well as the canonical URL and
// blob digest.
func (buh *blobUploadHandler) writeBlobCreatedHeaders(w http.ResponseWriter, desc distribution.Descriptor) error {
blobURL, err := buh.urlBuilder.BuildBlobURL(buh.Repository.Name(), desc.Digest)
if err != nil {
return err
}
w.Header().Set("Location", blobURL)
w.Header().Set("Content-Length", "0")
w.Header().Set("Docker-Content-Digest", desc.Digest.String())
w.WriteHeader(http.StatusCreated)
return nil
}

View file

@ -169,6 +169,10 @@ func (pbs *proxyBlobStore) Resume(ctx context.Context, id string) (distribution.
return nil, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
return distribution.Descriptor{}, distribution.ErrUnsupported
}
func (pbs *proxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
return nil, distribution.ErrUnsupported
}

View file

@ -58,6 +58,14 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B
return sbs.blobs.Resume(ctx, id)
}
func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
sbsMu.Lock()
sbs.stats["mount"]++
sbsMu.Unlock()
return sbs.blobs.Mount(ctx, sourceRepo, dgst)
}
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
sbsMu.Lock()
sbs.stats["open"]++

View file

@ -310,6 +310,154 @@ func TestSimpleBlobRead(t *testing.T) {
}
}
// TestBlobMount covers the blob mount process, exercising common
// error paths that might be seen during a mount.
func TestBlobMount(t *testing.T) {
randomDataReader, dgst, err := testutil.CreateRandomTarFile()
if err != nil {
t.Fatalf("error creating random reader: %v", err)
}
ctx := context.Background()
imageName := "foo/bar"
sourceImageName := "foo/source"
driver := inmemory.New()
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
if err != nil {
t.Fatalf("error creating registry: %v", err)
}
repository, err := registry.Repository(ctx, imageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sourceRepository, err := registry.Repository(ctx, sourceImageName)
if err != nil {
t.Fatalf("unexpected error getting repo: %v", err)
}
sbs := sourceRepository.Blobs(ctx)
blobUpload, err := sbs.Create(ctx)
if err != nil {
t.Fatalf("unexpected error starting layer upload: %s", err)
}
// Get the size of our random tarfile
randomDataSize, err := seekerSize(randomDataReader)
if err != nil {
t.Fatalf("error getting seeker size of random data: %v", err)
}
nn, err := io.Copy(blobUpload, randomDataReader)
if err != nil {
t.Fatalf("unexpected error uploading layer data: %v", err)
}
desc, err := blobUpload.Commit(ctx, distribution.Descriptor{Digest: dgst})
if err != nil {
t.Fatalf("unexpected error finishing layer upload: %v", err)
}
// Test for existence.
statDesc, err := sbs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, sbs)
}
if statDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
bs := repository.Blobs(ctx)
// Test destination for existence.
statDesc, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
}
mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest)
if err != nil {
t.Fatalf("unexpected error mounting layer: %v", err)
}
if mountDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc)
}
// Test for existence.
statDesc, err = bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error checking for existence: %v, %#v", err, bs)
}
if statDesc != desc {
t.Fatalf("descriptors not equal: %v != %v", statDesc, desc)
}
rc, err := bs.Open(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error opening blob for read: %v", err)
}
defer rc.Close()
h := sha256.New()
nn, err = io.Copy(h, rc)
if err != nil {
t.Fatalf("error reading layer: %v", err)
}
if nn != randomDataSize {
t.Fatalf("incorrect read length")
}
if digest.NewDigest("sha256", h) != dgst {
t.Fatalf("unexpected digest from uploaded layer: %q != %q", digest.NewDigest("sha256", h), dgst)
}
// Delete the blob from the source repo
err = sbs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err := bs.Stat(ctx, desc.Digest)
if err != nil {
t.Fatalf("unexpected error stating blob deleted from source repository: %v", err)
}
d, err = sbs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
// Delete the blob from the dest repo
err = bs.Delete(ctx, desc.Digest)
if err != nil {
t.Fatalf("Unexpected error deleting blob")
}
d, err = bs.Stat(ctx, desc.Digest)
if err == nil {
t.Fatalf("unexpected non-error stating deleted blob: %v", d)
}
switch err {
case distribution.ErrBlobUnknown:
break
default:
t.Errorf("Unexpected error type stat-ing deleted manifest: %#v", err)
}
}
// TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) {
ctx := context.Background()

View file

@ -20,6 +20,7 @@ type linkPathFunc func(name string, dgst digest.Digest) (string, error)
// that grant access to the global blob store.
type linkedBlobStore struct {
*blobStore
registry *registry
blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService
repository distribution.Repository
@ -185,6 +186,28 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
return nil
}
func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {
return distribution.Descriptor{}, err
}
stat, err := repo.Blobs(ctx).Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
desc := distribution.Descriptor{
Size: stat.Size,
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}
return desc, lbs.linkBlob(ctx, desc)
}
// newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
fw, err := newFileWriter(ctx, lbs.driver, path)

View file

@ -233,6 +233,7 @@ func (repo *repository) Blobs(ctx context.Context) distribution.BlobStore {
}
return &linkedBlobStore{
registry: repo.registry,
blobStore: repo.blobStore,
blobServer: repo.blobServer,
blobAccessController: statter,