Use resumable digest for efficient upload finish

By using a resumable digester and storing the state of upload digests between
subsequent upload chunks, finalizing an upload no longer requires reading back
all of the uploaded data to verify the client's expected digest.

Docker-DCO-1.1-Signed-off-by: Josh Hawn <josh.hawn@docker.com> (github: jlhawn)
This commit is contained in:
Josh Hawn 2015-03-24 10:35:01 -07:00
parent 60b6748c95
commit 18c9a1cdd8
3 changed files with 226 additions and 25 deletions

View file

@ -142,6 +142,7 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
layerStore: ls,
uuid: uuid,
startedAt: startedAt,
resumableDigester: digest.NewCanonicalResumableDigester(),
bufferedFileWriter: *fw,
}, nil
}

View file

@ -3,7 +3,9 @@ package storage
import (
"fmt"
"io"
"os"
"path"
"strconv"
"time"
"github.com/Sirupsen/logrus"
@ -20,10 +22,11 @@ var _ distribution.LayerUpload = &layerWriter{}
type layerWriter struct {
layerStore *layerStore
uuid string
startedAt time.Time
uuid string
startedAt time.Time
resumableDigester digest.ResumableDigester
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface
bufferedFileWriter
}
@ -83,37 +86,212 @@ func (lw *layerWriter) Cancel() error {
return nil
}
func (lw *layerWriter) Write(p []byte) (int, error) {
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := lw.resumeHashAt(lw.offset); err != nil {
return 0, err
}
return io.MultiWriter(&lw.bufferedFileWriter, lw.resumableDigester).Write(p)
}
func (lw *layerWriter) ReadFrom(r io.Reader) (n int64, err error) {
// Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to
// match the current write position.
if err := lw.resumeHashAt(lw.offset); err != nil {
return 0, err
}
return lw.bufferedFileWriter.ReadFrom(io.TeeReader(r, lw.resumableDigester))
}
func (lw *layerWriter) Close() error {
if err := lw.storeHashState(); err != nil {
return err
}
return lw.bufferedFileWriter.Close()
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (lw *layerWriter) getStoredHashStates() ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
name: lw.layerStore.repository.Name(),
uuid: lw.uuid,
alg: lw.resumableDigester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := lw.driver.List(uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
// resumeHashAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (lw *layerWriter) resumeHashAt(offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
if offset == int64(lw.resumableDigester.Len()) {
// State of digester is already at the requseted offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := lw.getStoredHashStates()
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := lw.driver.Delete(hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
lw.resumableDigester.Reset()
} else {
storedState, err := lw.driver.GetContent(hashStateMatch.path)
if err != nil {
return err
}
if err = lw.resumableDigester.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(lw.resumableDigester.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired
// offset.
fr, err := newFileReader(lw.driver, lw.path)
if err != nil {
return err
}
if _, err = fr.Seek(int64(lw.resumableDigester.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", lw.resumableDigester.Len(), err)
}
if _, err := io.CopyN(lw.resumableDigester, fr, gapLen); err != nil {
return err
}
}
return nil
}
func (lw *layerWriter) storeHashState() error {
uploadHashStatePath, err := lw.layerStore.repository.registry.pm.path(uploadHashStatePathSpec{
name: lw.layerStore.repository.Name(),
uuid: lw.uuid,
alg: lw.resumableDigester.Digest().Algorithm(),
offset: int64(lw.resumableDigester.Len()),
})
if err != nil {
return err
}
hashState, err := lw.resumableDigester.State()
if err != nil {
return err
}
return lw.driver.PutContent(uploadHashStatePath, hashState)
}
// validateLayer checks the layer data against the digest, returning an error
// if it does not match. The canonical digest is returned.
func (lw *layerWriter) validateLayer(dgst digest.Digest) (digest.Digest, error) {
digestVerifier, err := digest.NewDigestVerifier(dgst)
if err != nil {
// Restore the hasher state to the end of the upload.
if err := lw.resumeHashAt(lw.size); err != nil {
return "", err
}
// TODO(stevvooe): Store resumable hash calculations in upload directory
// in driver. Something like a file at path <uuid>/resumablehash/<offest>
// with the hash state up to that point would be perfect. The hasher would
// then only have to fetch the difference.
var verified bool
canonical := lw.resumableDigester.Digest()
// Read the file from the backend driver and validate it.
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
if err != nil {
return "", err
if canonical.Algorithm() == dgst.Algorithm() {
// Common case: client and server prefer the same canonical digest
// algorithm - currently SHA256.
verified = dgst == canonical
} else {
// The client wants to use a different digest algorithm. They'll just
// have to be patient and wait for us to download and re-hash the
// uploaded content using that digest algorithm.
digestVerifier, err := digest.NewDigestVerifier(dgst)
if err != nil {
return "", err
}
// Read the file from the backend driver and validate it.
fr, err := newFileReader(lw.bufferedFileWriter.driver, lw.path)
if err != nil {
return "", err
}
if _, err = io.Copy(digestVerifier, fr); err != nil {
return "", err
}
verified = digestVerifier.Verified()
}
tr := io.TeeReader(fr, digestVerifier)
// TODO(stevvooe): This is one of the places we need a Digester write
// sink. Instead, its read driven. This might be okay.
// Calculate an updated digest with the latest version.
canonical, err := digest.FromReader(tr)
if err != nil {
return "", err
}
if !digestVerifier.Verified() {
if !verified {
return "", distribution.ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),

View file

@ -33,6 +33,7 @@ const storagePathVersion = "v2"
// -> _uploads/<uuid>
// data
// startedat
// hashstates/<algorithm>/<offset>
// -> blob/<algorithm>
// <split directory content addressable storage>
//
@ -87,6 +88,7 @@ const storagePathVersion = "v2"
//
// uploadDataPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/data
// uploadStartedAtPathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/startedat
// uploadHashStatePathSpec: <root>/v2/repositories/<name>/_uploads/<uuid>/hashstates/<algorithm>/<offset>
//
// Blob Store:
//
@ -249,6 +251,12 @@ func (pm *pathMapper) path(spec pathSpec) (string, error) {
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "data")...), nil
case uploadStartedAtPathSpec:
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "startedat")...), nil
case uploadHashStatePathSpec:
offset := fmt.Sprintf("%d", v.offset)
if v.list {
offset = "" // Limit to the prefix for listing offsets.
}
return path.Join(append(repoPrefix, v.name, "_uploads", v.uuid, "hashstates", v.alg, offset)...), nil
default:
// TODO(sday): This is an internal error. Ensure it doesn't escape (panic?).
return "", fmt.Errorf("unknown path spec: %#v", v)
@ -424,6 +432,20 @@ type uploadStartedAtPathSpec struct {
func (uploadStartedAtPathSpec) pathSpec() {}
// uploadHashStatePathSpec defines the path parameters for the file that stores
// the hash function state of an upload at a specific byte offset. If `list` is
// set, then the path mapper will generate a list prefix for all hash state
// offsets for the upload identified by the name, uuid, and alg.
type uploadHashStatePathSpec struct {
name string
uuid string
alg string
offset int64
list bool
}
func (uploadHashStatePathSpec) pathSpec() {}
// digestPathComponents provides a consistent path breakdown for a given
// digest. For a generic digest, it will be as follows:
//