forked from TrueCloudLab/distribution
Merge pull request #1344 from BrianBland/blobCreateWithOptions
Adds functional options arguments to the Blobs Create method, remove Mount operation
This commit is contained in:
commit
5120357906
13 changed files with 217 additions and 120 deletions
26
blobs.go
26
blobs.go
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -40,6 +41,18 @@ func (err ErrBlobInvalidDigest) Error() string {
|
||||||
err.Digest, err.Reason)
|
err.Digest, err.Reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrBlobMounted returned when a blob is mounted from another repository
|
||||||
|
// instead of initiating an upload session.
|
||||||
|
type ErrBlobMounted struct {
|
||||||
|
From reference.Canonical
|
||||||
|
Descriptor Descriptor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err ErrBlobMounted) Error() string {
|
||||||
|
return fmt.Sprintf("blob mounted from: %v to: %v",
|
||||||
|
err.From, err.Descriptor)
|
||||||
|
}
|
||||||
|
|
||||||
// Descriptor describes targeted content. Used in conjunction with a blob
|
// Descriptor describes targeted content. Used in conjunction with a blob
|
||||||
// store, a descriptor can be used to fetch, store and target any kind of
|
// store, a descriptor can be used to fetch, store and target any kind of
|
||||||
// blob. The struct also describes the wire protocol format. Fields should
|
// blob. The struct also describes the wire protocol format. Fields should
|
||||||
|
@ -151,14 +164,19 @@ type BlobIngester interface {
|
||||||
// returned handle can be written to and later resumed using an opaque
|
// returned handle can be written to and later resumed using an opaque
|
||||||
// identifier. With this approach, one can Close and Resume a BlobWriter
|
// identifier. With this approach, one can Close and Resume a BlobWriter
|
||||||
// multiple times until the BlobWriter is committed or cancelled.
|
// multiple times until the BlobWriter is committed or cancelled.
|
||||||
Create(ctx context.Context) (BlobWriter, error)
|
Create(ctx context.Context, options ...BlobCreateOption) (BlobWriter, error)
|
||||||
|
|
||||||
// Resume attempts to resume a write to a blob, identified by an id.
|
// Resume attempts to resume a write to a blob, identified by an id.
|
||||||
Resume(ctx context.Context, id string) (BlobWriter, error)
|
Resume(ctx context.Context, id string) (BlobWriter, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Mount adds a blob to this service from another source repository,
|
// BlobCreateOption is a general extensible function argument for blob creation
|
||||||
// identified by a digest.
|
// methods. A BlobIngester may choose to honor any or none of the given
|
||||||
Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error)
|
// BlobCreateOptions, which can be specific to the implementation of the
|
||||||
|
// BlobIngester receiving them.
|
||||||
|
// TODO (brianbland): unify this with ManifestServiceOption in the future
|
||||||
|
type BlobCreateOption interface {
|
||||||
|
Apply(interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlobWriter provides a handle for inserting data into a blob store.
|
// BlobWriter provides a handle for inserting data into a blob store.
|
||||||
|
|
|
@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may
|
||||||
ignore the value but if it is used, the client should verify the value against
|
ignore the value but if it is used, the client should verify the value against
|
||||||
the uploaded blob data.
|
the uploaded blob data.
|
||||||
|
|
||||||
|
If a mount fails due to invalid repository or digest arguments, the registry
|
||||||
|
will fall back to the standard upload behavior and return a `202 Accepted` with
|
||||||
|
the upload URL in the `Location` header:
|
||||||
|
|
||||||
|
```
|
||||||
|
202 Accepted
|
||||||
|
Location: /v2/<name>/blobs/uploads/<uuid>
|
||||||
|
Range: bytes=0-<offset>
|
||||||
|
Content-Length: 0
|
||||||
|
Docker-Upload-UUID: <uuid>
|
||||||
|
```
|
||||||
|
|
||||||
|
This behavior is consistent with older versions of the registry, which do not
|
||||||
|
recognize the repository mount query parameters.
|
||||||
|
|
||||||
|
Note: a client may issue a HEAD request to check existence of a blob in a source
|
||||||
|
repository to distinguish between the registry not supporting blob mounts and
|
||||||
|
the blob not existing in the expected repository.
|
||||||
|
|
||||||
##### Errors
|
##### Errors
|
||||||
|
|
||||||
If an 502, 503 or 504 error is received, the client should assume that the
|
If an 502, 503 or 504 error is received, the client should assume that the
|
||||||
|
|
|
@ -735,6 +735,25 @@ the uploaded blob which may differ from the provided digest. Most clients may
|
||||||
ignore the value but if it is used, the client should verify the value against
|
ignore the value but if it is used, the client should verify the value against
|
||||||
the uploaded blob data.
|
the uploaded blob data.
|
||||||
|
|
||||||
|
If a mount fails due to invalid repository or digest arguments, the registry
|
||||||
|
will fall back to the standard upload behavior and return a `202 Accepted` with
|
||||||
|
the upload URL in the `Location` header:
|
||||||
|
|
||||||
|
```
|
||||||
|
202 Accepted
|
||||||
|
Location: /v2/<name>/blobs/uploads/<uuid>
|
||||||
|
Range: bytes=0-<offset>
|
||||||
|
Content-Length: 0
|
||||||
|
Docker-Upload-UUID: <uuid>
|
||||||
|
```
|
||||||
|
|
||||||
|
This behavior is consistent with older versions of the registry, which do not
|
||||||
|
recognize the repository mount query parameters.
|
||||||
|
|
||||||
|
Note: a client may issue a HEAD request to check existence of a blob in a source
|
||||||
|
repository to distinguish between the registry not supporting blob mounts and
|
||||||
|
the blob not existing in the expected repository.
|
||||||
|
|
||||||
##### Errors
|
##### Errors
|
||||||
|
|
||||||
If an 502, 503 or 504 error is received, the client should assume that the
|
If an 502, 503 or 504 error is received, the client should assume that the
|
||||||
|
|
|
@ -42,7 +42,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte)
|
||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,10 +50,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
panic("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestEmptyTar(t *testing.T) {
|
func TestEmptyTar(t *testing.T) {
|
||||||
// Confirm that gzippedEmptyTar expands to 1024 NULL bytes.
|
// Confirm that gzippedEmptyTar expands to 1024 NULL bytes.
|
||||||
var decompressed [2048]byte
|
var decompressed [2048]byte
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (bs *mockBlobService) Put(ctx context.Context, mediaType string, p []byte)
|
||||||
return d, nil
|
return d, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *mockBlobService) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (bs *mockBlobService) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,10 +46,6 @@ func (bs *mockBlobService) Resume(ctx context.Context, id string) (distribution.
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *mockBlobService) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
panic("not implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBuilder(t *testing.T) {
|
func TestBuilder(t *testing.T) {
|
||||||
imgJSON := []byte(`{
|
imgJSON := []byte(`{
|
||||||
"architecture": "amd64",
|
"architecture": "amd64",
|
||||||
|
|
|
@ -160,8 +160,15 @@ func (bsl *blobServiceListener) Put(ctx context.Context, mediaType string, p []b
|
||||||
return desc, err
|
return desc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsl *blobServiceListener) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (bsl *blobServiceListener) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
wr, err := bsl.BlobStore.Create(ctx)
|
wr, err := bsl.BlobStore.Create(ctx, options...)
|
||||||
|
switch err := err.(type) {
|
||||||
|
case distribution.ErrBlobMounted:
|
||||||
|
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), err.Descriptor, err.From.Name()); err != nil {
|
||||||
|
context.GetLogger(ctx).Errorf("error dispatching blob mount to listener: %v", err)
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return bsl.decorateWriter(wr), err
|
return bsl.decorateWriter(wr), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,17 +177,6 @@ func (bsl *blobServiceListener) Resume(ctx context.Context, id string) (distribu
|
||||||
return bsl.decorateWriter(wr), err
|
return bsl.decorateWriter(wr), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bsl *blobServiceListener) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
desc, err := bsl.BlobStore.Mount(ctx, sourceRepo, dgst)
|
|
||||||
if err == nil {
|
|
||||||
if err := bsl.parent.listener.BlobMounted(bsl.parent.Repository.Name(), desc, sourceRepo); err != nil {
|
|
||||||
context.GetLogger(ctx).Errorf("error dispatching layer mount to listener: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return desc, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
|
func (bsl *blobServiceListener) decorateWriter(wr distribution.BlobWriter) distribution.BlobWriter {
|
||||||
return &blobWriterListener{
|
return &blobWriterListener{
|
||||||
BlobWriter: wr,
|
BlobWriter: wr,
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
|
@ -19,6 +18,7 @@ import (
|
||||||
"github.com/docker/distribution/reference"
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/api/v2"
|
"github.com/docker/distribution/registry/api/v2"
|
||||||
"github.com/docker/distribution/registry/client/transport"
|
"github.com/docker/distribution/registry/client/transport"
|
||||||
|
"github.com/docker/distribution/registry/storage"
|
||||||
"github.com/docker/distribution/registry/storage/cache"
|
"github.com/docker/distribution/registry/storage/cache"
|
||||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||||
)
|
)
|
||||||
|
@ -500,9 +500,6 @@ type blobs struct {
|
||||||
|
|
||||||
statter distribution.BlobDescriptorService
|
statter distribution.BlobDescriptorService
|
||||||
distribution.BlobDeleter
|
distribution.BlobDeleter
|
||||||
|
|
||||||
cacheLock sync.Mutex
|
|
||||||
cachedBlobUpload distribution.BlobWriter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func sanitizeLocation(location, base string) (string, error) {
|
func sanitizeLocation(location, base string) (string, error) {
|
||||||
|
@ -576,18 +573,23 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
|
||||||
return writer.Commit(ctx, desc)
|
return writer.Commit(ctx, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
bs.cacheLock.Lock()
|
var opts storage.CreateOptions
|
||||||
if bs.cachedBlobUpload != nil {
|
|
||||||
upload := bs.cachedBlobUpload
|
|
||||||
bs.cachedBlobUpload = nil
|
|
||||||
bs.cacheLock.Unlock()
|
|
||||||
|
|
||||||
return upload, nil
|
for _, option := range options {
|
||||||
|
err := option.Apply(&opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bs.cacheLock.Unlock()
|
|
||||||
|
|
||||||
u, err := bs.ub.BuildBlobUploadURL(bs.name)
|
var values []url.Values
|
||||||
|
|
||||||
|
if opts.Mount.ShouldMount {
|
||||||
|
values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}})
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := bs.ub.BuildBlobUploadURL(bs.name, values...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -598,7 +600,14 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if SuccessStatus(resp.StatusCode) {
|
switch resp.StatusCode {
|
||||||
|
case http.StatusCreated:
|
||||||
|
desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
|
||||||
|
case http.StatusAccepted:
|
||||||
// TODO(dmcgowan): Check for invalid UUID
|
// TODO(dmcgowan): Check for invalid UUID
|
||||||
uuid := resp.Header.Get("Docker-Upload-UUID")
|
uuid := resp.Header.Get("Docker-Upload-UUID")
|
||||||
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
|
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
|
||||||
|
@ -613,53 +622,15 @@ func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
location: location,
|
location: location,
|
||||||
}, nil
|
}, nil
|
||||||
|
default:
|
||||||
|
return nil, HandleErrorResponse(resp)
|
||||||
}
|
}
|
||||||
return nil, HandleErrorResponse(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
|
|
||||||
if err != nil {
|
|
||||||
return distribution.Descriptor{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := bs.client.Post(u, "", nil)
|
|
||||||
if err != nil {
|
|
||||||
return distribution.Descriptor{}, err
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
switch resp.StatusCode {
|
|
||||||
case http.StatusCreated:
|
|
||||||
return bs.Stat(ctx, dgst)
|
|
||||||
case http.StatusAccepted:
|
|
||||||
// Triggered a blob upload (legacy behavior), so cache the creation info
|
|
||||||
uuid := resp.Header.Get("Docker-Upload-UUID")
|
|
||||||
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
|
|
||||||
if err != nil {
|
|
||||||
return distribution.Descriptor{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
bs.cacheLock.Lock()
|
|
||||||
bs.cachedBlobUpload = &httpBlobUpload{
|
|
||||||
statter: bs.statter,
|
|
||||||
client: bs.client,
|
|
||||||
uuid: uuid,
|
|
||||||
startedAt: time.Now(),
|
|
||||||
location: location,
|
|
||||||
}
|
|
||||||
bs.cacheLock.Unlock()
|
|
||||||
|
|
||||||
return distribution.Descriptor{}, HandleErrorResponse(resp)
|
|
||||||
default:
|
|
||||||
return distribution.Descriptor{}, HandleErrorResponse(resp)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
|
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||||
return bs.statter.Clear(ctx, dgst)
|
return bs.statter.Clear(ctx, dgst)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,9 @@ import (
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
"github.com/docker/distribution/manifest"
|
"github.com/docker/distribution/manifest"
|
||||||
"github.com/docker/distribution/manifest/schema1"
|
"github.com/docker/distribution/manifest/schema1"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/api/errcode"
|
"github.com/docker/distribution/registry/api/errcode"
|
||||||
|
"github.com/docker/distribution/registry/storage"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
"github.com/docker/distribution/uuid"
|
"github.com/docker/distribution/uuid"
|
||||||
"github.com/docker/libtrust"
|
"github.com/docker/libtrust"
|
||||||
|
@ -471,6 +473,16 @@ func TestBlobMount(t *testing.T) {
|
||||||
var m testutil.RequestResponseMap
|
var m testutil.RequestResponseMap
|
||||||
repo := "test.example.com/uploadrepo"
|
repo := "test.example.com/uploadrepo"
|
||||||
sourceRepo := "test.example.com/sourcerepo"
|
sourceRepo := "test.example.com/sourcerepo"
|
||||||
|
|
||||||
|
namedRef, err := reference.ParseNamed(sourceRepo)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
canonicalRef, err := reference.WithDigest(namedRef, dgst)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
m = append(m, testutil.RequestResponseMapping{
|
m = append(m, testutil.RequestResponseMapping{
|
||||||
Request: testutil.Request{
|
Request: testutil.Request{
|
||||||
Method: "POST",
|
Method: "POST",
|
||||||
|
@ -511,13 +523,20 @@ func TestBlobMount(t *testing.T) {
|
||||||
|
|
||||||
l := r.Blobs(ctx)
|
l := r.Blobs(ctx)
|
||||||
|
|
||||||
stat, err := l.Mount(ctx, sourceRepo, dgst)
|
bw, err := l.Create(ctx, storage.WithMountFrom(canonicalRef))
|
||||||
if err != nil {
|
if bw != nil {
|
||||||
t.Fatal(err)
|
t.Fatalf("Expected blob writer to be nil, was %v", bw)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stat.Digest != dgst {
|
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
|
||||||
t.Fatalf("Unexpected digest: %s, expected %s", stat.Digest, dgst)
|
if ebm.From.Digest() != dgst {
|
||||||
|
t.Fatalf("Unexpected digest: %s, expected %s", ebm.From.Digest(), dgst)
|
||||||
|
}
|
||||||
|
if ebm.From.Name() != sourceRepo {
|
||||||
|
t.Fatalf("Unexpected from: %s, expected %s", ebm.From.Name(), sourceRepo)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t.Fatalf("Unexpected error: %v, expected an ErrBlobMounted", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,8 +9,10 @@ import (
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
ctxu "github.com/docker/distribution/context"
|
ctxu "github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/api/errcode"
|
"github.com/docker/distribution/registry/api/errcode"
|
||||||
"github.com/docker/distribution/registry/api/v2"
|
"github.com/docker/distribution/registry/api/v2"
|
||||||
|
"github.com/docker/distribution/registry/storage"
|
||||||
"github.com/gorilla/handlers"
|
"github.com/gorilla/handlers"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -118,19 +120,27 @@ type blobUploadHandler struct {
|
||||||
// StartBlobUpload begins the blob upload process and allocates a server-side
|
// StartBlobUpload begins the blob upload process and allocates a server-side
|
||||||
// blob writer session, optionally mounting the blob from a separate repository.
|
// blob writer session, optionally mounting the blob from a separate repository.
|
||||||
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
|
func (buh *blobUploadHandler) StartBlobUpload(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var options []distribution.BlobCreateOption
|
||||||
|
|
||||||
fromRepo := r.FormValue("from")
|
fromRepo := r.FormValue("from")
|
||||||
mountDigest := r.FormValue("mount")
|
mountDigest := r.FormValue("mount")
|
||||||
|
|
||||||
if mountDigest != "" && fromRepo != "" {
|
if mountDigest != "" && fromRepo != "" {
|
||||||
buh.mountBlob(w, fromRepo, mountDigest)
|
opt, err := buh.createBlobMountOption(fromRepo, mountDigest)
|
||||||
return
|
if err != nil {
|
||||||
|
options = append(options, opt)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs := buh.Repository.Blobs(buh)
|
blobs := buh.Repository.Blobs(buh)
|
||||||
upload, err := blobs.Create(buh)
|
upload, err := blobs.Create(buh, options...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == distribution.ErrUnsupported {
|
if ebm, ok := err.(distribution.ErrBlobMounted); ok {
|
||||||
|
if err := buh.writeBlobCreatedHeaders(w, ebm.Descriptor); err != nil {
|
||||||
|
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||||
|
}
|
||||||
|
} else if err == distribution.ErrUnsupported {
|
||||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
|
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnsupported)
|
||||||
} else {
|
} else {
|
||||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
||||||
|
@ -339,27 +349,23 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.
|
||||||
// mountBlob attempts to mount a blob from another repository by its digest. If
|
// mountBlob attempts to mount a blob from another repository by its digest. If
|
||||||
// successful, the blob is linked into the blob store and 201 Created is
|
// successful, the blob is linked into the blob store and 201 Created is
|
||||||
// returned with the canonical url of the blob.
|
// returned with the canonical url of the blob.
|
||||||
func (buh *blobUploadHandler) mountBlob(w http.ResponseWriter, fromRepo, mountDigest string) {
|
func (buh *blobUploadHandler) createBlobMountOption(fromRepo, mountDigest string) (distribution.BlobCreateOption, error) {
|
||||||
dgst, err := digest.ParseDigest(mountDigest)
|
dgst, err := digest.ParseDigest(mountDigest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
buh.Errors = append(buh.Errors, v2.ErrorCodeDigestInvalid.WithDetail(err))
|
return nil, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
blobs := buh.Repository.Blobs(buh)
|
ref, err := reference.ParseNamed(fromRepo)
|
||||||
desc, err := blobs.Mount(buh, fromRepo, dgst)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == distribution.ErrBlobUnknown {
|
return nil, err
|
||||||
buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUnknown.WithDetail(dgst))
|
|
||||||
} else {
|
|
||||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if err := buh.writeBlobCreatedHeaders(w, desc); err != nil {
|
|
||||||
buh.Errors = append(buh.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
|
canonical, err := reference.WithDigest(ref, dgst)
|
||||||
return
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return storage.WithMountFrom(canonical), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeBlobCreatedHeaders writes the standard headers describing a newly
|
// writeBlobCreatedHeaders writes the standard headers describing a newly
|
||||||
|
|
|
@ -161,7 +161,7 @@ func (pbs *proxyBlobStore) Put(ctx context.Context, mediaType string, p []byte)
|
||||||
return distribution.Descriptor{}, distribution.ErrUnsupported
|
return distribution.Descriptor{}, distribution.ErrUnsupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pbs *proxyBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (pbs *proxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
return nil, distribution.ErrUnsupported
|
return nil, distribution.ErrUnsupported
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,12 +42,12 @@ func (sbs statsBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte,
|
||||||
return sbs.blobs.Get(ctx, dgst)
|
return sbs.blobs.Get(ctx, dgst)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbs statsBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (sbs statsBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
sbsMu.Lock()
|
sbsMu.Lock()
|
||||||
sbs.stats["create"]++
|
sbs.stats["create"]++
|
||||||
sbsMu.Unlock()
|
sbsMu.Unlock()
|
||||||
|
|
||||||
return sbs.blobs.Create(ctx)
|
return sbs.blobs.Create(ctx, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
||||||
|
@ -58,14 +58,6 @@ func (sbs statsBlobStore) Resume(ctx context.Context, id string) (distribution.B
|
||||||
return sbs.blobs.Resume(ctx, id)
|
return sbs.blobs.Resume(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbs statsBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
||||||
sbsMu.Lock()
|
|
||||||
sbs.stats["mount"]++
|
|
||||||
sbsMu.Unlock()
|
|
||||||
|
|
||||||
return sbs.blobs.Mount(ctx, sourceRepo, dgst)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
func (sbs statsBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
||||||
sbsMu.Lock()
|
sbsMu.Lock()
|
||||||
sbs.stats["open"]++
|
sbs.stats["open"]++
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/storage/cache/memory"
|
"github.com/docker/distribution/registry/storage/cache/memory"
|
||||||
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
"github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
|
@ -377,13 +378,27 @@ func TestBlobMount(t *testing.T) {
|
||||||
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
|
t.Fatalf("unexpected non-error stating unmounted blob: %v", desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
mountDesc, err := bs.Mount(ctx, sourceRepository.Name(), desc.Digest)
|
namedRef, err := reference.ParseNamed(sourceRepository.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
canonicalRef, err := reference.WithDigest(namedRef, desc.Digest)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bw, err := bs.Create(ctx, WithMountFrom(canonicalRef))
|
||||||
|
if bw != nil {
|
||||||
|
t.Fatal("unexpected blobwriter returned from Create call, should mount instead")
|
||||||
|
}
|
||||||
|
|
||||||
|
ebm, ok := err.(distribution.ErrBlobMounted)
|
||||||
|
if !ok {
|
||||||
t.Fatalf("unexpected error mounting layer: %v", err)
|
t.Fatalf("unexpected error mounting layer: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mountDesc != desc {
|
if ebm.Descriptor != desc {
|
||||||
t.Fatalf("descriptors not equal: %v != %v", mountDesc, desc)
|
t.Fatalf("descriptors not equal: %v != %v", ebm.Descriptor, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test for existence.
|
// Test for existence.
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution"
|
"github.com/docker/distribution"
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
"github.com/docker/distribution/digest"
|
"github.com/docker/distribution/digest"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
"github.com/docker/distribution/registry/storage/driver"
|
"github.com/docker/distribution/registry/storage/driver"
|
||||||
"github.com/docker/distribution/uuid"
|
"github.com/docker/distribution/uuid"
|
||||||
)
|
)
|
||||||
|
@ -95,10 +97,58 @@ func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte)
|
||||||
return desc, lbs.linkBlob(ctx, desc)
|
return desc, lbs.linkBlob(ctx, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateOptions is a collection of blob creation modifiers relevant to general
|
||||||
|
// blob storage intended to be configured by the BlobCreateOption.Apply method.
|
||||||
|
type CreateOptions struct {
|
||||||
|
Mount struct {
|
||||||
|
ShouldMount bool
|
||||||
|
From reference.Canonical
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type optionFunc func(interface{}) error
|
||||||
|
|
||||||
|
func (f optionFunc) Apply(v interface{}) error {
|
||||||
|
return f(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithMountFrom returns a BlobCreateOption which designates that the blob should be
|
||||||
|
// mounted from the given canonical reference.
|
||||||
|
func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
|
||||||
|
return optionFunc(func(v interface{}) error {
|
||||||
|
opts, ok := v.(*CreateOptions)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("unexpected options type: %T", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts.Mount.ShouldMount = true
|
||||||
|
opts.Mount.From = ref
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Writer begins a blob write session, returning a handle.
|
// Writer begins a blob write session, returning a handle.
|
||||||
func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
|
||||||
context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
|
context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
|
||||||
|
|
||||||
|
var opts CreateOptions
|
||||||
|
|
||||||
|
for _, option := range options {
|
||||||
|
err := option.Apply(&opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.Mount.ShouldMount {
|
||||||
|
desc, err := lbs.mount(ctx, opts.Mount.From.Name(), opts.Mount.From.Digest())
|
||||||
|
if err == nil {
|
||||||
|
// Mount successful, no need to initiate an upload session
|
||||||
|
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
uuid := uuid.Generate().String()
|
uuid := uuid.Generate().String()
|
||||||
startedAt := time.Now().UTC()
|
startedAt := time.Now().UTC()
|
||||||
|
|
||||||
|
@ -186,7 +236,7 @@ func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) erro
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lbs *linkedBlobStore) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||||
repo, err := lbs.registry.Repository(ctx, sourceRepo)
|
repo, err := lbs.registry.Repository(ctx, sourceRepo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
|
|
Loading…
Reference in a new issue