Merge pull request #546 from stevvooe/resumable-digest-refactor

Remove digest package's dependency on external sha implementation
This commit is contained in:
Stephen Day 2015-05-22 18:15:37 -07:00
commit 601960573d
7 changed files with 277 additions and 233 deletions

View file

@ -322,8 +322,8 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
if err != nil {
return distribution.Descriptor{}, err
}
dgstr := digest.NewCanonicalDigester()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr))
dgstr := digest.Canonical.New()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr.Hash()))
if err != nil {
return distribution.Descriptor{}, err
}

View file

@ -213,8 +213,8 @@ func TestBlobAPI(t *testing.T) {
// Now, push just a chunk
layerFile.Seek(0, 0)
canonicalDigester := digest.NewCanonicalDigester()
if _, err := io.Copy(canonicalDigester, layerFile); err != nil {
canonicalDigester := digest.Canonical.New()
if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
t.Fatalf("error copying to digest: %v", err)
}
canonicalDigest := canonicalDigester.Digest()
@ -637,9 +637,9 @@ func doPushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Diges
// pushLayer pushes the layer content returning the url on success.
func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string {
digester := digest.NewCanonicalDigester()
digester := digest.Canonical.New()
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, &digester))
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, digester.Hash()))
if err != nil {
t.Fatalf("unexpected error doing push layer request: %v", err)
}
@ -702,9 +702,9 @@ func doPushChunk(t *testing.T, uploadURLBase string, body io.Reader) (*http.Resp
uploadURL := u.String()
digester := digest.NewCanonicalDigester()
digester := digest.Canonical.New()
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester))
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester.Hash()))
if err != nil {
t.Fatalf("unexpected error creating new request: %v", err)
}

View file

@ -1,11 +1,9 @@
package storage
import (
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
"time"
"github.com/Sirupsen/logrus"
@ -15,14 +13,19 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
var (
errResumableDigestNotAvailable = errors.New("resumable digest not available")
)
// layerWriter is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface.
type blobWriter struct {
blobStore *linkedBlobStore
id string
startedAt time.Time
resumableDigester digest.ResumableDigester
id string
startedAt time.Time
digester digest.Digester
written int64 // track the contiguous write
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface
@ -82,33 +85,31 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
}
func (bw *blobWriter) Write(p []byte) (int, error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.Write(p)
}
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
return io.MultiWriter(&bw.bufferedFileWriter, bw.resumableDigester).Write(p)
n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
bw.written += int64(n)
return n, err
}
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.ReadFrom(r)
}
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil {
if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err
}
return bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.resumableDigester))
nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
bw.written += nn
return nn, err
}
func (bw *blobWriter) Close() error {
@ -116,10 +117,8 @@ func (bw *blobWriter) Close() error {
return bw.err
}
if bw.resumableDigester != nil {
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err
}
if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err
}
return bw.bufferedFileWriter.Close()
@ -171,13 +170,11 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
desc.Length = bw.size
}
if bw.resumableDigester != nil {
// Restore the hasher state to the end of the upload.
if err := bw.resumeHashAt(ctx, bw.size); err != nil {
return distribution.Descriptor{}, err
}
// TODO(stevvooe): This section is very meandering. Need to be broken down
// to be a lot more clear.
canonical = bw.resumableDigester.Digest()
if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
canonical = bw.digester.Digest()
if canonical.Algorithm() == desc.Digest.Algorithm() {
// Common case: client and server prefer the same canonical digest
@ -189,33 +186,49 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
// uploaded content using that digest algorithm.
fullHash = true
}
} else {
} else if err == errResumableDigestNotAvailable {
// Not using resumable digests, so we need to hash the entire layer.
fullHash = true
} else {
return distribution.Descriptor{}, err
}
if fullHash {
digester := digest.NewCanonicalDigester()
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
if err != nil {
return distribution.Descriptor{}, err
// a fantastic optimization: if the the written data and the size are
// the same, we don't need to read the data from the backend. This is
// because we've written the entire file in the lifecycle of the
// current instance.
if bw.written == bw.size && digest.Canonical == desc.Digest.Algorithm() {
canonical = bw.digester.Digest()
verified = desc.Digest == canonical
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Length)
if err != nil {
return distribution.Descriptor{}, err
// If the check based on size fails, we fall back to the slowest of
// paths. We may be able to make the size-based check a stronger
// guarantee, so this may be defensive.
if !verified {
digester := digest.Canonical.New()
digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
if err != nil {
return distribution.Descriptor{}, err
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Length)
if err != nil {
return distribution.Descriptor{}, err
}
tr := io.TeeReader(fr, digester.Hash())
if _, err := io.Copy(digestVerifier, tr); err != nil {
return distribution.Descriptor{}, err
}
canonical = digester.Digest()
verified = digestVerifier.Verified()
}
tr := io.TeeReader(fr, digester)
if _, err := io.Copy(digestVerifier, tr); err != nil {
return distribution.Descriptor{}, err
}
canonical = digester.Digest()
verified = digestVerifier.Verified()
}
if !verified {
@ -298,172 +311,3 @@ func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor
return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
// resumeHashAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeHashAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
if offset == int64(bw.resumableDigester.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
bw.resumableDigester.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = bw.resumableDigester.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(bw.resumableDigester.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(bw.resumableDigester.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", bw.resumableDigester.Len(), err)
}
if _, err := io.CopyN(bw.resumableDigester, fr, gapLen); err != nil {
return err
}
}
return nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
offset: int64(bw.resumableDigester.Len()),
})
if err != nil {
return err
}
hashState, err := bw.resumableDigester.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}

View file

@ -2,5 +2,16 @@
package storage
func (bw *blobWriter) setupResumableDigester() {
import (
"github.com/docker/distribution/context"
)
// resumeHashAt is a noop when resumable digest support is disabled.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
return errResumableDigestNotAvailable
}
// storeHashState is a noop when resumable digest support is disabled.
func (bw *blobWriter) storeHashState(ctx context.Context) error {
return errResumableDigestNotAvailable
}

View file

@ -2,8 +2,198 @@
package storage
import "github.com/docker/distribution/digest"
import (
"fmt"
"io"
"os"
"path"
"strconv"
func (bw *blobWriter) setupResumableDigester() {
bw.resumableDigester = digest.NewCanonicalResumableDigester()
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/stevvooe/resumable"
// register resumable hashes with import
_ "github.com/stevvooe/resumable/sha256"
_ "github.com/stevvooe/resumable/sha512"
)
// resumeDigestAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
if offset == int64(h.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
h.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = h.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(h.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(h.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", h.Len(), err)
}
if _, err := io.CopyN(h, fr, gapLen); err != nil {
return err
}
}
return nil
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
offset: int64(h.Len()),
})
if err != nil {
return err
}
hashState, err := h.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
}

View file

@ -164,11 +164,10 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.New(),
bufferedFileWriter: *fw,
}
bw.setupResumableDigester()
return bw, nil
}

View file

@ -262,7 +262,7 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
if v.list {
offset = "" // Limit to the prefix for listing offsets.
}
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "hashstates", v.alg, offset)...), nil
return path.Join(append(repoPrefix, v.name, "_uploads", v.id, "hashstates", string(v.alg), offset)...), nil
case repositoriesRootPathSpec:
return path.Join(repoPrefix...), nil
default:
@ -447,7 +447,7 @@ func (uploadStartedAtPathSpec) pathSpec() {}
type uploadHashStatePathSpec struct {
name string
id string
alg string
alg digest.Algorithm
offset int64
list bool
}
@ -479,7 +479,7 @@ func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error)
return nil, err
}
algorithm := blobAlgorithmReplacer.Replace(dgst.Algorithm())
algorithm := blobAlgorithmReplacer.Replace(string(dgst.Algorithm()))
hex := dgst.Hex()
prefix := []string{algorithm}