forked from TrueCloudLab/distribution
31047c8113
The current implementation of digest.FromBytes returns an error. This error can never be non-nil, but its presence in the function signature means each call site needs error handling code for an error that is always nil. I verified that none of the hash.Hash implementations in the standard library can return an error on Write. Nor can any of the hash.Hash implementations vendored in distribution. This commit changes digest.FromBytes not to return an error. If Write returns an error, it will panic, but as discussed above, this should never happen. This commit also avoids using a bytes.Reader to feed data into the hash function in FromBytes. This makes the hypothetical case that would panic a bit more explicit, and should also be more performant. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
342 lines
10 KiB
Go
342 lines
10 KiB
Go
package storage
|
|
|
|
import (
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/digest"
|
|
"github.com/docker/distribution/registry/storage/driver"
|
|
"github.com/docker/distribution/uuid"
|
|
)
|
|
|
|
// linkPathFunc describes a function that can resolve a link based on the
|
|
// repository name and digest.
|
|
type linkPathFunc func(name string, dgst digest.Digest) (string, error)
|
|
|
|
// linkedBlobStore provides a full BlobService that namespaces the blobs to a
|
|
// given repository. Effectively, it manages the links in a given repository
|
|
// that grant access to the global blob store.
|
|
type linkedBlobStore struct {
|
|
*blobStore
|
|
blobServer distribution.BlobServer
|
|
blobAccessController distribution.BlobDescriptorService
|
|
repository distribution.Repository
|
|
ctx context.Context // only to be used where context can't come through method args
|
|
deleteEnabled bool
|
|
resumableDigestEnabled bool
|
|
|
|
// linkPathFns specifies one or more path functions allowing one to
|
|
// control the repository blob link set to which the blob store
|
|
// dispatches. This is required because manifest and layer blobs have not
|
|
// yet been fully merged. At some point, this functionality should be
|
|
// removed an the blob links folder should be merged. The first entry is
|
|
// treated as the "canonical" link location and will be used for writes.
|
|
linkPathFns []linkPathFunc
|
|
}
|
|
|
|
var _ distribution.BlobStore = &linkedBlobStore{}
|
|
|
|
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
return lbs.blobAccessController.Stat(ctx, dgst)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
|
|
canonical, err := lbs.Stat(ctx, dgst) // access check
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return lbs.blobStore.Get(ctx, canonical.Digest)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
|
|
canonical, err := lbs.Stat(ctx, dgst) // access check
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return lbs.blobStore.Open(ctx, canonical.Digest)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
|
canonical, err := lbs.Stat(ctx, dgst) // access check
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if canonical.MediaType != "" {
|
|
// Set the repository local content type.
|
|
w.Header().Set("Content-Type", canonical.MediaType)
|
|
}
|
|
|
|
return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
|
|
dgst := digest.FromBytes(p)
|
|
// Place the data in the blob store first.
|
|
desc, err := lbs.blobStore.Put(ctx, mediaType, p)
|
|
if err != nil {
|
|
context.GetLogger(ctx).Errorf("error putting into main store: %v", err)
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
// TODO(stevvooe): Write out mediatype if incoming differs from what is
|
|
// returned by Put above. Note that we should allow updates for a given
|
|
// repository.
|
|
|
|
return desc, lbs.linkBlob(ctx, desc)
|
|
}
|
|
|
|
// Writer begins a blob write session, returning a handle.
|
|
func (lbs *linkedBlobStore) Create(ctx context.Context) (distribution.BlobWriter, error) {
|
|
context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
|
|
|
|
uuid := uuid.Generate().String()
|
|
startedAt := time.Now().UTC()
|
|
|
|
path, err := pathFor(uploadDataPathSpec{
|
|
name: lbs.repository.Name(),
|
|
id: uuid,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
|
|
name: lbs.repository.Name(),
|
|
id: uuid,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Write a startedat file for this upload
|
|
if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return lbs.newBlobUpload(ctx, uuid, path, startedAt)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
|
|
context.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
|
|
|
|
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
|
|
name: lbs.repository.Name(),
|
|
id: id,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case driver.PathNotFoundError:
|
|
return nil, distribution.ErrBlobUploadUnknown
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
path, err := pathFor(uploadDataPathSpec{
|
|
name: lbs.repository.Name(),
|
|
id: id,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return lbs.newBlobUpload(ctx, id, path, startedAt)
|
|
}
|
|
|
|
func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
|
if !lbs.deleteEnabled {
|
|
return distribution.ErrUnsupported
|
|
}
|
|
|
|
// Ensure the blob is available for deletion
|
|
_, err := lbs.blobAccessController.Stat(ctx, dgst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = lbs.blobAccessController.Clear(ctx, dgst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// newBlobUpload allocates a new upload controller with the given state.
|
|
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
|
|
fw, err := newFileWriter(ctx, lbs.driver, path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bw := &blobWriter{
|
|
blobStore: lbs,
|
|
id: uuid,
|
|
startedAt: startedAt,
|
|
digester: digest.Canonical.New(),
|
|
bufferedFileWriter: *fw,
|
|
resumableDigestEnabled: lbs.resumableDigestEnabled,
|
|
}
|
|
|
|
return bw, nil
|
|
}
|
|
|
|
// linkBlob links a valid, written blob into the registry under the named
|
|
// repository for the upload controller.
|
|
func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error {
|
|
dgsts := append([]digest.Digest{canonical.Digest}, aliases...)
|
|
|
|
// TODO(stevvooe): Need to write out mediatype for only canonical hash
|
|
// since we don't care about the aliases. They are generally unused except
|
|
// for tarsum but those versions don't care about mediatype.
|
|
|
|
// Don't make duplicate links.
|
|
seenDigests := make(map[digest.Digest]struct{}, len(dgsts))
|
|
|
|
// only use the first link
|
|
linkPathFn := lbs.linkPathFns[0]
|
|
|
|
for _, dgst := range dgsts {
|
|
if _, seen := seenDigests[dgst]; seen {
|
|
continue
|
|
}
|
|
seenDigests[dgst] = struct{}{}
|
|
|
|
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type linkedBlobStatter struct {
|
|
*blobStore
|
|
repository distribution.Repository
|
|
|
|
// linkPathFns specifies one or more path functions allowing one to
|
|
// control the repository blob link set to which the blob store
|
|
// dispatches. This is required because manifest and layer blobs have not
|
|
// yet been fully merged. At some point, this functionality should be
|
|
// removed an the blob links folder should be merged. The first entry is
|
|
// treated as the "canonical" link location and will be used for writes.
|
|
linkPathFns []linkPathFunc
|
|
}
|
|
|
|
var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
|
|
|
|
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
|
|
var (
|
|
resolveErr error
|
|
target digest.Digest
|
|
)
|
|
|
|
// try the many link path functions until we get success or an error that
|
|
// is not PathNotFoundError.
|
|
for _, linkPathFn := range lbs.linkPathFns {
|
|
var err error
|
|
target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn)
|
|
|
|
if err == nil {
|
|
break // success!
|
|
}
|
|
|
|
switch err := err.(type) {
|
|
case driver.PathNotFoundError:
|
|
resolveErr = distribution.ErrBlobUnknown // move to the next linkPathFn, saving the error
|
|
default:
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
}
|
|
|
|
if resolveErr != nil {
|
|
return distribution.Descriptor{}, resolveErr
|
|
}
|
|
|
|
if target != dgst {
|
|
// Track when we are doing cross-digest domain lookups. ie, tarsum to sha256.
|
|
context.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target)
|
|
}
|
|
|
|
// TODO(stevvooe): Look up repository local mediatype and replace that on
|
|
// the returned descriptor.
|
|
|
|
return lbs.blobStore.statter.Stat(ctx, target)
|
|
}
|
|
|
|
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) {
|
|
// clear any possible existence of a link described in linkPathFns
|
|
for _, linkPathFn := range lbs.linkPathFns {
|
|
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = lbs.blobStore.driver.Delete(ctx, blobLinkPath)
|
|
if err != nil {
|
|
switch err := err.(type) {
|
|
case driver.PathNotFoundError:
|
|
continue // just ignore this error and continue
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// resolveTargetWithFunc allows us to read a link to a resource with different
|
|
// linkPathFuncs to let us try a few different paths before returning not
|
|
// found.
|
|
func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) {
|
|
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return lbs.blobStore.readlink(ctx, blobLinkPath)
|
|
}
|
|
|
|
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
|
|
// The canonical descriptor for a blob is set at the commit phase of upload
|
|
return nil
|
|
}
|
|
|
|
// blobLinkPath provides the path to the blob link, also known as layers.
|
|
func blobLinkPath(name string, dgst digest.Digest) (string, error) {
|
|
return pathFor(layerLinkPathSpec{name: name, digest: dgst})
|
|
}
|
|
|
|
// manifestRevisionLinkPath provides the path to the manifest revision link.
|
|
func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) {
|
|
return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst})
|
|
}
|