forked from TrueCloudLab/distribution
Remove digest package's dependency on external sha implementation
The change relies on a refactor of the upstream resumable sha256/sha512 package that opts to register implementations with the standard library. This allows the resumable support to be detected where it matters, avoiding unnecessary and complex code. It also ensures that consumers of the digest package don't need to depend on the forked sha implementations. We also get an optimization with this change. If the size of data written to a digester is the same as the file size, we check to see if the digest has been verified. This works if the blob is written and committed in a single request. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
fd982a8bd5
commit
a0d242d9df
6 changed files with 269 additions and 226 deletions
|
@ -322,7 +322,7 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
dgstr := digest.NewCanonicalDigester()
|
dgstr := digest.NewCanonicalDigester()
|
||||||
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr))
|
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr.Hash()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,7 +214,7 @@ func TestBlobAPI(t *testing.T) {
|
||||||
layerFile.Seek(0, 0)
|
layerFile.Seek(0, 0)
|
||||||
|
|
||||||
canonicalDigester := digest.NewCanonicalDigester()
|
canonicalDigester := digest.NewCanonicalDigester()
|
||||||
if _, err := io.Copy(canonicalDigester, layerFile); err != nil {
|
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
|
||||||
t.Fatalf("error copying to digest: %v", err)
|
t.Fatalf("error copying to digest: %v", err)
|
||||||
}
|
}
|
||||||
canonicalDigest := canonicalDigester.Digest()
|
canonicalDigest := canonicalDigester.Digest()
|
||||||
|
@ -639,7 +639,7 @@ func doPushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Diges
|
||||||
func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string {
|
func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string {
|
||||||
digester := digest.NewCanonicalDigester()
|
digester := digest.NewCanonicalDigester()
|
||||||
|
|
||||||
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, &digester))
|
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, digester.Hash()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error doing push layer request: %v", err)
|
t.Fatalf("unexpected error doing push layer request: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -704,7 +704,7 @@ func doPushChunk(t *testing.T, uploadURLBase string, body io.Reader) (*http.Resp
|
||||||
|
|
||||||
digester := digest.NewCanonicalDigester()
|
digester := digest.NewCanonicalDigester()
|
||||||
|
|
||||||
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester))
|
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester.Hash()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error creating new request: %v", err)
|
t.Fatalf("unexpected error creating new request: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,6 @@ package storage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Sirupsen/logrus"
|
"github.com/Sirupsen/logrus"
|
||||||
|
@ -15,6 +12,10 @@ import (
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errResumableDigestNotAvailable = fmt.Errorf("resumable digest not available")
|
||||||
|
)
|
||||||
|
|
||||||
// layerWriter is used to control the various aspects of resumable
|
// layerWriter is used to control the various aspects of resumable
|
||||||
// layer upload. It implements the LayerUpload interface.
|
// layer upload. It implements the LayerUpload interface.
|
||||||
type blobWriter struct {
|
type blobWriter struct {
|
||||||
|
@ -22,7 +23,8 @@ type blobWriter struct {
|
||||||
|
|
||||||
id string
|
id string
|
||||||
startedAt time.Time
|
startedAt time.Time
|
||||||
resumableDigester digest.ResumableDigester
|
digester digest.Digester
|
||||||
|
written int64 // track the contiguous write
|
||||||
|
|
||||||
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
||||||
// LayerUpload Interface
|
// LayerUpload Interface
|
||||||
|
@ -82,33 +84,31 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bw *blobWriter) Write(p []byte) (int, error) {
|
func (bw *blobWriter) Write(p []byte) (int, error) {
|
||||||
if bw.resumableDigester == nil {
|
|
||||||
return bw.bufferedFileWriter.Write(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that the current write offset matches how many bytes have been
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
// written to the digester. If not, we need to update the digest state to
|
// written to the digester. If not, we need to update the digest state to
|
||||||
// match the current write position.
|
// match the current write position.
|
||||||
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
|
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return io.MultiWriter(&bw.bufferedFileWriter, bw.resumableDigester).Write(p)
|
n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
|
||||||
|
bw.written += int64(n)
|
||||||
|
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
if bw.resumableDigester == nil {
|
|
||||||
return bw.bufferedFileWriter.ReadFrom(r)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that the current write offset matches how many bytes have been
|
// Ensure that the current write offset matches how many bytes have been
|
||||||
// written to the digester. If not, we need to update the digest state to
|
// written to the digester. If not, we need to update the digest state to
|
||||||
// match the current write position.
|
// match the current write position.
|
||||||
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
|
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.resumableDigester))
|
nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
|
||||||
|
bw.written += nn
|
||||||
|
|
||||||
|
return nn, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bw *blobWriter) Close() error {
|
func (bw *blobWriter) Close() error {
|
||||||
|
@ -116,11 +116,9 @@ func (bw *blobWriter) Close() error {
|
||||||
return bw.err
|
return bw.err
|
||||||
}
|
}
|
||||||
|
|
||||||
if bw.resumableDigester != nil {
|
|
||||||
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
|
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return bw.bufferedFileWriter.Close()
|
return bw.bufferedFileWriter.Close()
|
||||||
}
|
}
|
||||||
|
@ -171,13 +169,11 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
desc.Length = bw.size
|
desc.Length = bw.size
|
||||||
}
|
}
|
||||||
|
|
||||||
if bw.resumableDigester != nil {
|
// TODO(stevvooe): This section is very meandering. Need to be broken down
|
||||||
// Restore the hasher state to the end of the upload.
|
// to be a lot more clear.
|
||||||
if err := bw.resumeHashAt(ctx, bw.size); err != nil {
|
|
||||||
return distribution.Descriptor{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
canonical = bw.resumableDigester.Digest()
|
if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
|
||||||
|
canonical = bw.digester.Digest()
|
||||||
|
|
||||||
if canonical.Algorithm() == desc.Digest.Algorithm() {
|
if canonical.Algorithm() == desc.Digest.Algorithm() {
|
||||||
// Common case: client and server prefer the same canonical digest
|
// Common case: client and server prefer the same canonical digest
|
||||||
|
@ -189,12 +185,27 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
// uploaded content using that digest algorithm.
|
// uploaded content using that digest algorithm.
|
||||||
fullHash = true
|
fullHash = true
|
||||||
}
|
}
|
||||||
} else {
|
} else if err == errResumableDigestNotAvailable {
|
||||||
// Not using resumable digests, so we need to hash the entire layer.
|
// Not using resumable digests, so we need to hash the entire layer.
|
||||||
fullHash = true
|
fullHash = true
|
||||||
|
} else {
|
||||||
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if fullHash {
|
if fullHash {
|
||||||
|
// a fantastic optimization: if the the written data and the size are
|
||||||
|
// the same, we don't need to read the data from the backend. This is
|
||||||
|
// because we've written the entire file in the lifecycle of the
|
||||||
|
// current instance.
|
||||||
|
if bw.written == bw.size && digest.CanonicalAlgorithm == desc.Digest.Algorithm() {
|
||||||
|
canonical = bw.digester.Digest()
|
||||||
|
verified = desc.Digest == canonical
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the check based on size fails, we fall back to the slowest of
|
||||||
|
// paths. We may be able to make the size-based check a stronger
|
||||||
|
// guarantee, so this may be defensive.
|
||||||
|
if !verified {
|
||||||
digester := digest.NewCanonicalDigester()
|
digester := digest.NewCanonicalDigester()
|
||||||
|
|
||||||
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
|
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
|
||||||
|
@ -208,7 +219,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tr := io.TeeReader(fr, digester)
|
tr := io.TeeReader(fr, digester.Hash())
|
||||||
|
|
||||||
if _, err := io.Copy(digestVerifier, tr); err != nil {
|
if _, err := io.Copy(digestVerifier, tr); err != nil {
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
|
@ -217,6 +228,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
canonical = digester.Digest()
|
canonical = digester.Digest()
|
||||||
verified = digestVerifier.Verified()
|
verified = digestVerifier.Verified()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !verified {
|
if !verified {
|
||||||
context.GetLoggerWithFields(ctx,
|
context.GetLoggerWithFields(ctx,
|
||||||
|
@ -298,172 +310,3 @@ func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor
|
||||||
|
|
||||||
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
|
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
type hashStateEntry struct {
|
|
||||||
offset int64
|
|
||||||
path string
|
|
||||||
}
|
|
||||||
|
|
||||||
// getStoredHashStates returns a slice of hashStateEntries for this upload.
|
|
||||||
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
|
|
||||||
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
|
|
||||||
name: bw.blobStore.repository.Name(),
|
|
||||||
id: bw.id,
|
|
||||||
alg: bw.resumableDigester.Digest().Algorithm(),
|
|
||||||
list: true,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
|
|
||||||
if err != nil {
|
|
||||||
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Treat PathNotFoundError as no entries.
|
|
||||||
paths = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
hashStateEntries := make([]hashStateEntry, 0, len(paths))
|
|
||||||
|
|
||||||
for _, p := range paths {
|
|
||||||
pathSuffix := path.Base(p)
|
|
||||||
// The suffix should be the offset.
|
|
||||||
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
|
|
||||||
}
|
|
||||||
|
|
||||||
return hashStateEntries, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// resumeHashAt attempts to restore the state of the internal hash function
|
|
||||||
// by loading the most recent saved hash state less than or equal to the given
|
|
||||||
// offset. Any unhashed bytes remaining less than the given offset are hashed
|
|
||||||
// from the content uploaded so far.
|
|
||||||
func (bw *blobWriter) resumeHashAt(ctx context.Context, offset int64) error {
|
|
||||||
if offset < 0 {
|
|
||||||
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset == int64(bw.resumableDigester.Len()) {
|
|
||||||
// State of digester is already at the requested offset.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// List hash states from storage backend.
|
|
||||||
var hashStateMatch hashStateEntry
|
|
||||||
hashStates, err := bw.getStoredHashStates(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the highest stored hashState with offset less than or equal to
|
|
||||||
// the requested offset.
|
|
||||||
for _, hashState := range hashStates {
|
|
||||||
if hashState.offset == offset {
|
|
||||||
hashStateMatch = hashState
|
|
||||||
break // Found an exact offset match.
|
|
||||||
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
|
|
||||||
// This offset is closer to the requested offset.
|
|
||||||
hashStateMatch = hashState
|
|
||||||
} else if hashState.offset > offset {
|
|
||||||
// Remove any stored hash state with offsets higher than this one
|
|
||||||
// as writes to this resumed hasher will make those invalid. This
|
|
||||||
// is probably okay to skip for now since we don't expect anyone to
|
|
||||||
// use the API in this way. For that reason, we don't treat an
|
|
||||||
// an error here as a fatal error, but only log it.
|
|
||||||
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
|
|
||||||
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if hashStateMatch.offset == 0 {
|
|
||||||
// No need to load any state, just reset the hasher.
|
|
||||||
bw.resumableDigester.Reset()
|
|
||||||
} else {
|
|
||||||
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = bw.resumableDigester.Restore(storedState); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mind the gap.
|
|
||||||
if gapLen := offset - int64(bw.resumableDigester.Len()); gapLen > 0 {
|
|
||||||
// Need to read content from the upload to catch up to the desired offset.
|
|
||||||
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = fr.Seek(int64(bw.resumableDigester.Len()), os.SEEK_SET); err != nil {
|
|
||||||
return fmt.Errorf("unable to seek to layer reader offset %d: %s", bw.resumableDigester.Len(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := io.CopyN(bw.resumableDigester, fr, gapLen); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bw *blobWriter) storeHashState(ctx context.Context) error {
|
|
||||||
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
|
|
||||||
name: bw.blobStore.repository.Name(),
|
|
||||||
id: bw.id,
|
|
||||||
alg: bw.resumableDigester.Digest().Algorithm(),
|
|
||||||
offset: int64(bw.resumableDigester.Len()),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
hashState, err := bw.resumableDigester.State()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removeResources should clean up all resources associated with the upload
|
|
||||||
// instance. An error will be returned if the clean up cannot proceed. If the
|
|
||||||
// resources are already not present, no error will be returned.
|
|
||||||
func (bw *blobWriter) removeResources(ctx context.Context) error {
|
|
||||||
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
|
|
||||||
name: bw.blobStore.repository.Name(),
|
|
||||||
id: bw.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resolve and delete the containing directory, which should include any
|
|
||||||
// upload related files.
|
|
||||||
dirPath := path.Dir(dataPath)
|
|
||||||
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
|
|
||||||
switch err := err.(type) {
|
|
||||||
case storagedriver.PathNotFoundError:
|
|
||||||
break // already gone!
|
|
||||||
default:
|
|
||||||
// This should be uncommon enough such that returning an error
|
|
||||||
// should be okay. At this point, the upload should be mostly
|
|
||||||
// complete, but perhaps the backend became unaccessible.
|
|
||||||
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,5 +2,16 @@
|
||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
func (bw *blobWriter) setupResumableDigester() {
|
import (
|
||||||
|
"github.com/docker/distribution/context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// resumeHashAt is a noop when resumable digest support is disabled.
|
||||||
|
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
|
||||||
|
return errResumableDigestNotAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
// storeHashState is a noop when resumable digest support is disabled.
|
||||||
|
func (bw *blobWriter) storeHashState(ctx context.Context) error {
|
||||||
|
return errResumableDigestNotAvailable
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,198 @@
|
||||||
|
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import "github.com/docker/distribution/digest"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
func (bw *blobWriter) setupResumableDigester() {
|
"github.com/Sirupsen/logrus"
|
||||||
bw.resumableDigester = digest.NewCanonicalResumableDigester()
|
"github.com/docker/distribution/context"
|
||||||
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
|
"github.com/stevvooe/resumable"
|
||||||
|
|
||||||
|
// register resumable hashes with import
|
||||||
|
_ "github.com/stevvooe/resumable/sha256"
|
||||||
|
_ "github.com/stevvooe/resumable/sha512"
|
||||||
|
)
|
||||||
|
|
||||||
|
// resumeDigestAt attempts to restore the state of the internal hash function
|
||||||
|
// by loading the most recent saved hash state less than or equal to the given
|
||||||
|
// offset. Any unhashed bytes remaining less than the given offset are hashed
|
||||||
|
// from the content uploaded so far.
|
||||||
|
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
|
||||||
|
if offset < 0 {
|
||||||
|
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
h, ok := bw.digester.Hash().(resumable.Hash)
|
||||||
|
if !ok {
|
||||||
|
return errResumableDigestNotAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
if offset == int64(h.Len()) {
|
||||||
|
// State of digester is already at the requested offset.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List hash states from storage backend.
|
||||||
|
var hashStateMatch hashStateEntry
|
||||||
|
hashStates, err := bw.getStoredHashStates(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the highest stored hashState with offset less than or equal to
|
||||||
|
// the requested offset.
|
||||||
|
for _, hashState := range hashStates {
|
||||||
|
if hashState.offset == offset {
|
||||||
|
hashStateMatch = hashState
|
||||||
|
break // Found an exact offset match.
|
||||||
|
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
|
||||||
|
// This offset is closer to the requested offset.
|
||||||
|
hashStateMatch = hashState
|
||||||
|
} else if hashState.offset > offset {
|
||||||
|
// Remove any stored hash state with offsets higher than this one
|
||||||
|
// as writes to this resumed hasher will make those invalid. This
|
||||||
|
// is probably okay to skip for now since we don't expect anyone to
|
||||||
|
// use the API in this way. For that reason, we don't treat an
|
||||||
|
// an error here as a fatal error, but only log it.
|
||||||
|
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
|
||||||
|
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if hashStateMatch.offset == 0 {
|
||||||
|
// No need to load any state, just reset the hasher.
|
||||||
|
h.Reset()
|
||||||
|
} else {
|
||||||
|
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = h.Restore(storedState); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mind the gap.
|
||||||
|
if gapLen := offset - int64(h.Len()); gapLen > 0 {
|
||||||
|
// Need to read content from the upload to catch up to the desired offset.
|
||||||
|
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = fr.Seek(int64(h.Len()), os.SEEK_SET); err != nil {
|
||||||
|
return fmt.Errorf("unable to seek to layer reader offset %d: %s", h.Len(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.CopyN(h, fr, gapLen); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeResources should clean up all resources associated with the upload
|
||||||
|
// instance. An error will be returned if the clean up cannot proceed. If the
|
||||||
|
// resources are already not present, no error will be returned.
|
||||||
|
func (bw *blobWriter) removeResources(ctx context.Context) error {
|
||||||
|
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
|
||||||
|
name: bw.blobStore.repository.Name(),
|
||||||
|
id: bw.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve and delete the containing directory, which should include any
|
||||||
|
// upload related files.
|
||||||
|
dirPath := path.Dir(dataPath)
|
||||||
|
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
|
||||||
|
switch err := err.(type) {
|
||||||
|
case storagedriver.PathNotFoundError:
|
||||||
|
break // already gone!
|
||||||
|
default:
|
||||||
|
// This should be uncommon enough such that returning an error
|
||||||
|
// should be okay. At this point, the upload should be mostly
|
||||||
|
// complete, but perhaps the backend became unaccessible.
|
||||||
|
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type hashStateEntry struct {
|
||||||
|
offset int64
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStoredHashStates returns a slice of hashStateEntries for this upload.
|
||||||
|
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
|
||||||
|
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
|
||||||
|
name: bw.blobStore.repository.Name(),
|
||||||
|
id: bw.id,
|
||||||
|
alg: bw.digester.Digest().Algorithm(),
|
||||||
|
list: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
|
||||||
|
if err != nil {
|
||||||
|
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Treat PathNotFoundError as no entries.
|
||||||
|
paths = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
hashStateEntries := make([]hashStateEntry, 0, len(paths))
|
||||||
|
|
||||||
|
for _, p := range paths {
|
||||||
|
pathSuffix := path.Base(p)
|
||||||
|
// The suffix should be the offset.
|
||||||
|
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
|
||||||
|
}
|
||||||
|
|
||||||
|
return hashStateEntries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bw *blobWriter) storeHashState(ctx context.Context) error {
|
||||||
|
h, ok := bw.digester.Hash().(resumable.Hash)
|
||||||
|
if !ok {
|
||||||
|
return errResumableDigestNotAvailable
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
|
||||||
|
name: bw.blobStore.repository.Name(),
|
||||||
|
id: bw.id,
|
||||||
|
alg: bw.digester.Digest().Algorithm(),
|
||||||
|
offset: int64(h.Len()),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
hashState, err := h.State()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,11 +164,10 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
|
||||||
blobStore: lbs,
|
blobStore: lbs,
|
||||||
id: uuid,
|
id: uuid,
|
||||||
startedAt: startedAt,
|
startedAt: startedAt,
|
||||||
|
digester: digest.NewCanonicalDigester(),
|
||||||
bufferedFileWriter: *fw,
|
bufferedFileWriter: *fw,
|
||||||
}
|
}
|
||||||
|
|
||||||
bw.setupResumableDigester()
|
|
||||||
|
|
||||||
return bw, nil
|
return bw, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue