614e8c8277
The use of the pathMapper is no longer needed the way we have organized the code base. The extra level of indirection has proved unnecessary and confusing so we've opted to clean it up. In the future, we may require more flexibility, but now it is simply not required. Signed-off-by: Stephen J Day <stephen.day@docker.com>
379 lines
11 KiB
Go
379 lines
11 KiB
Go
package storage
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/Sirupsen/logrus"
|
|
"github.com/docker/distribution"
|
|
"github.com/docker/distribution/context"
|
|
"github.com/docker/distribution/digest"
|
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
|
)
|
|
|
|
var (
|
|
errResumableDigestNotAvailable = errors.New("resumable digest not available")
|
|
)
|
|
|
|
// layerWriter is used to control the various aspects of resumable
|
|
// layer upload. It implements the LayerUpload interface.
|
|
type blobWriter struct {
|
|
blobStore *linkedBlobStore
|
|
|
|
id string
|
|
startedAt time.Time
|
|
digester digest.Digester
|
|
written int64 // track the contiguous write
|
|
|
|
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
|
// LayerUpload Interface
|
|
bufferedFileWriter
|
|
|
|
resumableDigestEnabled bool
|
|
}
|
|
|
|
var _ distribution.BlobWriter = &blobWriter{}
|
|
|
|
// ID returns the identifier for this upload.
|
|
func (bw *blobWriter) ID() string {
|
|
return bw.id
|
|
}
|
|
|
|
func (bw *blobWriter) StartedAt() time.Time {
|
|
return bw.startedAt
|
|
}
|
|
|
|
// Commit marks the upload as completed, returning a valid descriptor. The
|
|
// final size and digest are checked against the first descriptor provided.
|
|
func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
|
context.GetLogger(ctx).Debug("(*blobWriter).Commit")
|
|
|
|
if err := bw.bufferedFileWriter.Close(); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
canonical, err := bw.validateBlob(ctx, desc)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if err := bw.moveBlob(ctx, canonical); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if err := bw.blobStore.linkBlob(ctx, canonical, desc.Digest); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if err := bw.removeResources(ctx); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
err = bw.blobStore.blobAccessController.SetDescriptor(ctx, canonical.Digest, canonical)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
return canonical, nil
|
|
}
|
|
|
|
// Rollback the blob upload process, releasing any resources associated with
|
|
// the writer and canceling the operation.
|
|
func (bw *blobWriter) Cancel(ctx context.Context) error {
|
|
context.GetLogger(ctx).Debug("(*blobWriter).Rollback")
|
|
if err := bw.removeResources(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
bw.Close()
|
|
return nil
|
|
}
|
|
|
|
func (bw *blobWriter) Write(p []byte) (int, error) {
|
|
// Ensure that the current write offset matches how many bytes have been
|
|
// written to the digester. If not, we need to update the digest state to
|
|
// match the current write position.
|
|
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
|
|
return 0, err
|
|
}
|
|
|
|
n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
|
|
bw.written += int64(n)
|
|
|
|
return n, err
|
|
}
|
|
|
|
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
|
// Ensure that the current write offset matches how many bytes have been
|
|
// written to the digester. If not, we need to update the digest state to
|
|
// match the current write position.
|
|
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
|
|
return 0, err
|
|
}
|
|
|
|
nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
|
|
bw.written += nn
|
|
|
|
return nn, err
|
|
}
|
|
|
|
func (bw *blobWriter) Close() error {
|
|
if bw.err != nil {
|
|
return bw.err
|
|
}
|
|
|
|
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return bw.bufferedFileWriter.Close()
|
|
}
|
|
|
|
// validateBlob checks the data against the digest, returning an error if it
|
|
// does not match. The canonical descriptor is returned.
|
|
func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
|
var (
|
|
verified, fullHash bool
|
|
canonical digest.Digest
|
|
)
|
|
|
|
if desc.Digest == "" {
|
|
// if no descriptors are provided, we have nothing to validate
|
|
// against. We don't really want to support this for the registry.
|
|
return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
|
|
Reason: fmt.Errorf("cannot validate against empty digest"),
|
|
}
|
|
}
|
|
|
|
// Stat the on disk file
|
|
if fi, err := bw.bufferedFileWriter.driver.Stat(ctx, bw.path); err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
// NOTE(stevvooe): We really don't care if the file is
|
|
// not actually present for the reader. We now assume
|
|
// that the desc length is zero.
|
|
desc.Size = 0
|
|
default:
|
|
// Any other error we want propagated up the stack.
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
} else {
|
|
if fi.IsDir() {
|
|
return distribution.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path)
|
|
}
|
|
|
|
bw.size = fi.Size()
|
|
}
|
|
|
|
if desc.Size > 0 {
|
|
if desc.Size != bw.size {
|
|
return distribution.Descriptor{}, distribution.ErrBlobInvalidLength
|
|
}
|
|
} else {
|
|
// if provided 0 or negative length, we can assume caller doesn't know or
|
|
// care about length.
|
|
desc.Size = bw.size
|
|
}
|
|
|
|
// TODO(stevvooe): This section is very meandering. Need to be broken down
|
|
// to be a lot more clear.
|
|
|
|
if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
|
|
canonical = bw.digester.Digest()
|
|
|
|
if canonical.Algorithm() == desc.Digest.Algorithm() {
|
|
// Common case: client and server prefer the same canonical digest
|
|
// algorithm - currently SHA256.
|
|
verified = desc.Digest == canonical
|
|
} else {
|
|
// The client wants to use a different digest algorithm. They'll just
|
|
// have to be patient and wait for us to download and re-hash the
|
|
// uploaded content using that digest algorithm.
|
|
fullHash = true
|
|
}
|
|
} else if err == errResumableDigestNotAvailable {
|
|
// Not using resumable digests, so we need to hash the entire layer.
|
|
fullHash = true
|
|
} else {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
if fullHash {
|
|
// a fantastic optimization: if the the written data and the size are
|
|
// the same, we don't need to read the data from the backend. This is
|
|
// because we've written the entire file in the lifecycle of the
|
|
// current instance.
|
|
if bw.written == bw.size && digest.Canonical == desc.Digest.Algorithm() {
|
|
canonical = bw.digester.Digest()
|
|
verified = desc.Digest == canonical
|
|
}
|
|
|
|
// If the check based on size fails, we fall back to the slowest of
|
|
// paths. We may be able to make the size-based check a stronger
|
|
// guarantee, so this may be defensive.
|
|
if !verified {
|
|
digester := digest.Canonical.New()
|
|
|
|
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
// Read the file from the backend driver and validate it.
|
|
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Size)
|
|
if err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
tr := io.TeeReader(fr, digester.Hash())
|
|
|
|
if _, err := io.Copy(digestVerifier, tr); err != nil {
|
|
return distribution.Descriptor{}, err
|
|
}
|
|
|
|
canonical = digester.Digest()
|
|
verified = digestVerifier.Verified()
|
|
}
|
|
}
|
|
|
|
if !verified {
|
|
context.GetLoggerWithFields(ctx,
|
|
map[string]interface{}{
|
|
"canonical": canonical,
|
|
"provided": desc.Digest,
|
|
}, "canonical", "provided").
|
|
Errorf("canonical digest does match provided digest")
|
|
return distribution.Descriptor{}, distribution.ErrBlobInvalidDigest{
|
|
Digest: desc.Digest,
|
|
Reason: fmt.Errorf("content does not match digest"),
|
|
}
|
|
}
|
|
|
|
// update desc with canonical hash
|
|
desc.Digest = canonical
|
|
|
|
if desc.MediaType == "" {
|
|
desc.MediaType = "application/octet-stream"
|
|
}
|
|
|
|
return desc, nil
|
|
}
|
|
|
|
// moveBlob moves the data into its final, hash-qualified destination,
|
|
// identified by dgst. The layer should be validated before commencing the
|
|
// move.
|
|
func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor) error {
|
|
blobPath, err := pathFor(blobDataPathSpec{
|
|
digest: desc.Digest,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check for existence
|
|
if _, err := bw.blobStore.driver.Stat(ctx, blobPath); err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
break // ensure that it doesn't exist.
|
|
default:
|
|
return err
|
|
}
|
|
} else {
|
|
// If the path exists, we can assume that the content has already
|
|
// been uploaded, since the blob storage is content-addressable.
|
|
// While it may be corrupted, detection of such corruption belongs
|
|
// elsewhere.
|
|
return nil
|
|
}
|
|
|
|
// If no data was received, we may not actually have a file on disk. Check
|
|
// the size here and write a zero-length file to blobPath if this is the
|
|
// case. For the most part, this should only ever happen with zero-length
|
|
// tars.
|
|
if _, err := bw.blobStore.driver.Stat(ctx, bw.path); err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
// HACK(stevvooe): This is slightly dangerous: if we verify above,
|
|
// get a hash, then the underlying file is deleted, we risk moving
|
|
// a zero-length blob into a nonzero-length blob location. To
|
|
// prevent this horrid thing, we employ the hack of only allowing
|
|
// to this happen for the zero tarsum.
|
|
if desc.Digest == digest.DigestSha256EmptyTar {
|
|
return bw.blobStore.driver.PutContent(ctx, blobPath, []byte{})
|
|
}
|
|
|
|
// We let this fail during the move below.
|
|
logrus.
|
|
WithField("upload.id", bw.ID()).
|
|
WithField("digest", desc.Digest).Warnf("attempted to move zero-length content with non-zero digest")
|
|
default:
|
|
return err // unrelated error
|
|
}
|
|
}
|
|
|
|
// TODO(stevvooe): We should also write the mediatype when executing this move.
|
|
|
|
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
|
|
}
|
|
|
|
// removeResources should clean up all resources associated with the upload
|
|
// instance. An error will be returned if the clean up cannot proceed. If the
|
|
// resources are already not present, no error will be returned.
|
|
func (bw *blobWriter) removeResources(ctx context.Context) error {
|
|
dataPath, err := pathFor(uploadDataPathSpec{
|
|
name: bw.blobStore.repository.Name(),
|
|
id: bw.id,
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Resolve and delete the containing directory, which should include any
|
|
// upload related files.
|
|
dirPath := path.Dir(dataPath)
|
|
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
|
|
switch err := err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
break // already gone!
|
|
default:
|
|
// This should be uncommon enough such that returning an error
|
|
// should be okay. At this point, the upload should be mostly
|
|
// complete, but perhaps the backend became unaccessible.
|
|
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (bw *blobWriter) Reader() (io.ReadCloser, error) {
|
|
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
|
|
try := 1
|
|
for try <= 5 {
|
|
_, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path)
|
|
if err == nil {
|
|
break
|
|
}
|
|
switch err.(type) {
|
|
case storagedriver.PathNotFoundError:
|
|
context.GetLogger(bw.ctx).Debugf("Nothing found on try %d, sleeping...", try)
|
|
time.Sleep(1 * time.Second)
|
|
try++
|
|
default:
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return readCloser, nil
|
|
}
|