Remove digest package's dependency on external sha implementation

The change relies on a refactor of the upstream resumable sha256/sha512 package
that opts to register implementations with the standard library. This allows
the resumable support to be detected where it matters, avoiding unnecessary and
complex code. It also ensures that consumers of the digest package don't need
to depend on the forked sha implementations.

We also get an optimization with this change. If the size of data written to a
digester is the same as the file size, we check to see if the digest has been
verified. This works if the blob is written and committed in a single request.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-05-20 23:44:08 -07:00
parent 07ff029506
commit eee6cad2cf
32 changed files with 431 additions and 459 deletions

4
Godeps/Godeps.json generated
View file

@ -85,8 +85,8 @@
"Rev": "e444e69cbd2e2e3e0749a2f3c717cec491552bbf" "Rev": "e444e69cbd2e2e3e0749a2f3c717cec491552bbf"
}, },
{ {
"ImportPath": "github.com/jlhawn/go-crypto", "ImportPath": "github.com/stevvooe/resumable",
"Rev": "cd738dde20f0b3782516181b0866c9bb9db47401" "Rev": "cf61dd331ceba0ab845444fdb626b9a465704e49"
}, },
{ {
"ImportPath": "github.com/yvasiyarov/go-metrics", "ImportPath": "github.com/yvasiyarov/go-metrics",

View file

@ -1,87 +0,0 @@
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package crypto is a Subset of the Go `crypto` Package with a Resumable Hash
package crypto
import (
"hash"
"strconv"
)
// Hash identifies a cryptographic hash function that is implemented in another
// package.
type Hash uint
// HashFunc simply returns the value of h so that Hash implements SignerOpts.
func (h Hash) HashFunc() Hash {
return h
}
const (
SHA224 Hash = 1 + iota // import crypto/sha256
SHA256 // import crypto/sha256
SHA384 // import crypto/sha512
SHA512 // import crypto/sha512
maxHash
)
var digestSizes = []uint8{
SHA224: 28,
SHA256: 32,
SHA384: 48,
SHA512: 64,
}
// Size returns the length, in bytes, of a digest resulting from the given hash
// function. It doesn't require that the hash function in question be linked
// into the program.
func (h Hash) Size() int {
if h > 0 && h < maxHash {
return int(digestSizes[h])
}
panic("crypto: Size of unknown hash function")
}
// ResumableHash is the common interface implemented by all resumable hash
// functions.
type ResumableHash interface {
// ResumableHash is a superset of hash.Hash
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() uint64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
}
var hashes = make([]func() ResumableHash, maxHash)
// New returns a new ResumableHash calculating the given hash function. New panics
// if the hash function is not linked into the binary.
func (h Hash) New() ResumableHash {
if h > 0 && h < maxHash {
f := hashes[h]
if f != nil {
return f()
}
}
panic("crypto: requested hash function #" + strconv.Itoa(int(h)) + " is unavailable")
}
// Available reports whether the given hash function is linked into the binary.
func (h Hash) Available() bool {
return h < maxHash && hashes[h] != nil
}
// RegisterHash registers a function that returns a new instance of the given
// hash function. This is intended to be called from the init function in
// packages that implement hash functions.
func RegisterHash(h Hash, f func() ResumableHash) {
if h >= maxHash {
panic("crypto: RegisterHash of unknown hash function")
}
hashes[h] = f
}

View file

@ -1,23 +0,0 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// This file defines flags attached to various functions
// and data objects. The compilers, assemblers, and linker must
// all agree on these values.
// Don't profile the marked routine. This flag is deprecated.
#define NOPROF 1
// It is ok for the linker to get multiple of these symbols. It will
// pick one of the duplicates to use.
#define DUPOK 2
// Don't insert stack check preamble.
#define NOSPLIT 4
// Put this data in a read-only section.
#define RODATA 8
// This data contains no pointers.
#define NOPTR 16
// This is a wrapper function and should not count as disabling 'recover'.
#define WRAPPER 32
// This function uses its incoming context register.
#define NEEDCTXT 64

View file

@ -3,4 +3,4 @@ A Subset of the Go `crypto` Package with a Resumable Hash Interface
### Documentation ### Documentation
GoDocs: http://godoc.org/github.com/jlhawn/go-crypto GoDocs: http://godoc.org/github.com/stevvooe/resumable

View file

@ -0,0 +1,43 @@
// Package resumable registers resumable versions of hash functions. Resumable
// varieties of hash functions are available via the standard crypto package.
// Support can be checked by type assertion against the resumable.Hash
// interface.
//
// While one can use these sub-packages directly, it makes more sense to
// register them using side-effect imports:
//
// import _ "github.com/stevvooe/resumable/sha256"
//
// This will make the resumable hashes available to the application through
// the standard crypto package. For example, if a new sha256 is required, one
// should use the following:
//
// h := crypto.SHA256.New()
//
// Such a features allows one to control the inclusion of resumable hash
// support in a single file. Applications that require the resumable hash
// implementation can type switch to detect support, while other parts of the
// application can be completely oblivious to the presence of the alternative
// hash functions.
//
// Also note that the implementations available in this package are completely
// untouched from their Go counterparts in the standard library. Only an extra
// file is added to each package to implement the extra resumable hash
// functions.
package resumable
import "hash"
// Hash is the common interface implemented by all resumable hash functions.
type Hash interface {
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() int64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
}

View file

@ -6,8 +6,8 @@ import (
) )
// Len returns the number of bytes which have been written to the digest. // Len returns the number of bytes which have been written to the digest.
func (d *digest) Len() uint64 { func (d *digest) Len() int64 {
return d.len return int64(d.len)
} }
// State returns a snapshot of the state of the digest. // State returns a snapshot of the state of the digest.

View file

@ -7,7 +7,8 @@
package sha256 package sha256
import ( import (
"github.com/jlhawn/go-crypto" "crypto"
"hash"
) )
func init() { func init() {
@ -77,15 +78,15 @@ func (d *digest) Reset() {
d.len = 0 d.len = 0
} }
// New returns a new crypto.ResumableHash computing the SHA256 checksum. // New returns a new hash.Hash computing the SHA256 checksum.
func New() crypto.ResumableHash { func New() hash.Hash {
d := new(digest) d := new(digest)
d.Reset() d.Reset()
return d return d
} }
// New224 returns a new crypto.ResumableHash computing the SHA224 checksum. // New224 returns a new hash.Hash computing the SHA224 checksum.
func New224() crypto.ResumableHash { func New224() hash.Hash {
d := new(digest) d := new(digest)
d.is224 = true d.is224 = true
d.Reset() d.Reset()

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
#include "../textflag.h" #include "textflag.h"
// SHA256 block routine. See sha256block.go for Go equivalent. // SHA256 block routine. See sha256block.go for Go equivalent.
// //

View file

@ -2,15 +2,17 @@ package sha256
import ( import (
"bytes" "bytes"
stdlib "crypto" "crypto"
"crypto/rand" "crypto/rand"
_ "crypto/sha256" // To register the stdlib sha224 and sha256 algs. "crypto/sha256" // To register the stdlib sha224 and sha256 algs.
resumable "github.com/jlhawn/go-crypto" "hash"
"io" "io"
"testing" "testing"
"github.com/stevvooe/resumable"
) )
func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) { func compareResumableHash(t *testing.T, newResumable func() hash.Hash, newStdlib func() hash.Hash) {
// Read 3 Kilobytes of random data into a buffer. // Read 3 Kilobytes of random data into a buffer.
buf := make([]byte, 3*1024) buf := make([]byte, 3*1024)
if _, err := io.ReadFull(rand.Reader, buf); err != nil { if _, err := io.ReadFull(rand.Reader, buf); err != nil {
@ -20,8 +22,8 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
// Use two Hash objects to consume prefixes of the data. One will be // Use two Hash objects to consume prefixes of the data. One will be
// snapshotted and resumed with each additional byte, then both will write // snapshotted and resumed with each additional byte, then both will write
// that byte. The digests should be equal after each byte is digested. // that byte. The digests should be equal after each byte is digested.
resumableHasher := r.New() resumableHasher := newResumable().(resumable.Hash)
stdlibHasher := h.New() stdlibHasher := newStdlib()
// First, assert that the initial distest is the same. // First, assert that the initial distest is the same.
if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) { if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) {
@ -52,6 +54,21 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
} }
func TestResumable(t *testing.T) { func TestResumable(t *testing.T) {
compareResumableHash(t, resumable.SHA224, stdlib.SHA224) compareResumableHash(t, New224, sha256.New224)
compareResumableHash(t, resumable.SHA256, stdlib.SHA256) compareResumableHash(t, New, sha256.New)
}
func TestResumableRegistered(t *testing.T) {
for _, hf := range []crypto.Hash{crypto.SHA224, crypto.SHA256} {
// make sure that the hash gets the resumable version from the global
// registry in crypto library.
h := hf.New()
if rh, ok := h.(resumable.Hash); !ok {
t.Fatalf("non-resumable hash function registered: %#v %#v", rh, crypto.SHA256)
}
}
} }

View file

@ -6,8 +6,8 @@ import (
) )
// Len returns the number of bytes which have been written to the digest. // Len returns the number of bytes which have been written to the digest.
func (d *digest) Len() uint64 { func (d *digest) Len() int64 {
return d.len return int64(d.len)
} }
// State returns a snapshot of the state of the digest. // State returns a snapshot of the state of the digest.

View file

@ -7,7 +7,8 @@
package sha512 package sha512
import ( import (
"github.com/jlhawn/go-crypto" "crypto"
"hash"
) )
func init() { func init() {
@ -77,15 +78,15 @@ func (d *digest) Reset() {
d.len = 0 d.len = 0
} }
// New returns a new crypto.ResumableHash computing the SHA512 checksum. // New returns a new hash.Hash computing the SHA512 checksum.
func New() crypto.ResumableHash { func New() hash.Hash {
d := new(digest) d := new(digest)
d.Reset() d.Reset()
return d return d
} }
// New384 returns a new crypto.ResumableHash computing the SHA384 checksum. // New384 returns a new hash.Hash computing the SHA384 checksum.
func New384() crypto.ResumableHash { func New384() hash.Hash {
d := new(digest) d := new(digest)
d.is384 = true d.is384 = true
d.Reset() d.Reset()

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style // Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
#include "../textflag.h" #include "textflag.h"
// SHA512 block routine. See sha512block.go for Go equivalent. // SHA512 block routine. See sha512block.go for Go equivalent.
// //

View file

@ -2,15 +2,17 @@ package sha512
import ( import (
"bytes" "bytes"
stdlib "crypto" "crypto"
"crypto/rand" "crypto/rand" // To register the stdlib sha224 and sha256 algs.
_ "crypto/sha512" // To register the stdlib sha224 and sha256 algs. "crypto/sha512"
resumable "github.com/jlhawn/go-crypto" "hash"
"io" "io"
"testing" "testing"
"github.com/stevvooe/resumable"
) )
func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) { func compareResumableHash(t *testing.T, newResumable func() hash.Hash, newStdlib func() hash.Hash) {
// Read 3 Kilobytes of random data into a buffer. // Read 3 Kilobytes of random data into a buffer.
buf := make([]byte, 3*1024) buf := make([]byte, 3*1024)
if _, err := io.ReadFull(rand.Reader, buf); err != nil { if _, err := io.ReadFull(rand.Reader, buf); err != nil {
@ -20,8 +22,8 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
// Use two Hash objects to consume prefixes of the data. One will be // Use two Hash objects to consume prefixes of the data. One will be
// snapshotted and resumed with each additional byte, then both will write // snapshotted and resumed with each additional byte, then both will write
// that byte. The digests should be equal after each byte is digested. // that byte. The digests should be equal after each byte is digested.
resumableHasher := r.New() resumableHasher := newResumable().(resumable.Hash)
stdlibHasher := h.New() stdlibHasher := newStdlib()
// First, assert that the initial distest is the same. // First, assert that the initial distest is the same.
if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) { if !bytes.Equal(resumableHasher.Sum(nil), stdlibHasher.Sum(nil)) {
@ -52,6 +54,21 @@ func compareResumableHash(t *testing.T, r resumable.Hash, h stdlib.Hash) {
} }
func TestResumable(t *testing.T) { func TestResumable(t *testing.T) {
compareResumableHash(t, resumable.SHA384, stdlib.SHA384) compareResumableHash(t, New384, sha512.New384)
compareResumableHash(t, resumable.SHA512, stdlib.SHA512) compareResumableHash(t, New, sha512.New)
}
func TestResumableRegistered(t *testing.T) {
for _, hf := range []crypto.Hash{crypto.SHA384, crypto.SHA512} {
// make sure that the hash gets the resumable version from the global
// registry in crypto library.
h := hf.New()
if rh, ok := h.(resumable.Hash); !ok {
t.Fatalf("non-resumable hash function registered: %#v %#v", rh, crypto.SHA256)
}
}
} }

View file

@ -2,6 +2,7 @@ package digest
import ( import (
"bytes" "bytes"
"crypto"
"fmt" "fmt"
"hash" "hash"
"io" "io"
@ -15,8 +16,12 @@ import (
const ( const (
// DigestTarSumV1EmptyTar is the digest for the empty tar file. // DigestTarSumV1EmptyTar is the digest for the empty tar file.
DigestTarSumV1EmptyTar = "tarsum.v1+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" DigestTarSumV1EmptyTar = "tarsum.v1+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
// DigestSha256EmptyTar is the canonical sha256 digest of empty data // DigestSha256EmptyTar is the canonical sha256 digest of empty data
DigestSha256EmptyTar = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" DigestSha256EmptyTar = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
CanonicalAlgorithm = "sha256"
CanonicalHash = crypto.SHA256 // main digest algorithm used through distribution
) )
// Digest allows simple protection of hex formatted digest strings, prefixed // Digest allows simple protection of hex formatted digest strings, prefixed
@ -73,7 +78,7 @@ func ParseDigest(s string) (Digest, error) {
func FromReader(rd io.Reader) (Digest, error) { func FromReader(rd io.Reader) (Digest, error) {
digester := NewCanonicalDigester() digester := NewCanonicalDigester()
if _, err := io.Copy(digester, rd); err != nil { if _, err := io.Copy(digester.Hash(), rd); err != nil {
return "", err return "", err
} }

View file

@ -1,54 +1,40 @@
package digest package digest
import ( import "hash"
"crypto/sha256"
"hash"
)
// Digester calculates the digest of written data. It is functionally // Digester calculates the digest of written data. Writes should go directly
// equivalent to hash.Hash but provides methods for returning the Digest type // to the return value of Hash, while calling Digest will return the current
// rather than raw bytes. // value of the digest.
type Digester struct { type Digester interface {
alg string Hash() hash.Hash // provides direct access to underlying hash instance.
hash.Hash Digest() Digest
} }
// NewDigester create a new Digester with the given hashing algorithm and instance // NewDigester create a new Digester with the given hashing algorithm and
// of that algo's hasher. // instance of that algo's hasher.
func NewDigester(alg string, h hash.Hash) Digester { func NewDigester(alg string, h hash.Hash) Digester {
return Digester{ return &digester{
alg: alg, alg: alg,
Hash: h, hash: h,
} }
} }
// NewCanonicalDigester is a convenience function to create a new Digester with // NewCanonicalDigester is a convenience function to create a new Digester with
// our default settings. // our default settings.
func NewCanonicalDigester() Digester { func NewCanonicalDigester() Digester {
return NewDigester("sha256", sha256.New()) return NewDigester(CanonicalAlgorithm, CanonicalHash.New())
} }
// Digest returns the current digest for this digester. // digester provides a simple digester definition that embeds a hasher.
func (d *Digester) Digest() Digest { type digester struct {
return NewDigest(d.alg, d.Hash) alg string
hash hash.Hash
} }
// ResumableHash is the common interface implemented by all resumable hash func (d *digester) Hash() hash.Hash {
// functions. return d.hash
type ResumableHash interface {
// ResumableHash is a superset of hash.Hash
hash.Hash
// Len returns the number of bytes written to the Hash so far.
Len() uint64
// State returns a snapshot of the state of the Hash.
State() ([]byte, error)
// Restore resets the Hash to the given state.
Restore(state []byte) error
} }
// ResumableDigester is a digester that can export its internal state and be func (d *digester) Digest() Digest {
// restored from saved state. return NewDigest(d.alg, d.Hash())
type ResumableDigester interface {
ResumableHash
Digest() Digest
} }

View file

@ -1,52 +0,0 @@
// +build !noresumabledigest
package digest
import (
"fmt"
"github.com/jlhawn/go-crypto"
// For ResumableHash
_ "github.com/jlhawn/go-crypto/sha256" // For Resumable SHA256
_ "github.com/jlhawn/go-crypto/sha512" // For Resumable SHA384, SHA512
)
// resumableDigester implements ResumableDigester.
type resumableDigester struct {
alg string
crypto.ResumableHash
}
var resumableHashAlgs = map[string]crypto.Hash{
"sha256": crypto.SHA256,
"sha384": crypto.SHA384,
"sha512": crypto.SHA512,
}
// NewResumableDigester creates a new ResumableDigester with the given hashing
// algorithm.
func NewResumableDigester(alg string) (ResumableDigester, error) {
hash, supported := resumableHashAlgs[alg]
if !supported {
return resumableDigester{}, fmt.Errorf("unsupported resumable hash algorithm: %s", alg)
}
return resumableDigester{
alg: alg,
ResumableHash: hash.New(),
}, nil
}
// NewCanonicalResumableDigester creates a ResumableDigester using the default
// digest algorithm.
func NewCanonicalResumableDigester() ResumableDigester {
return resumableDigester{
alg: "sha256",
ResumableHash: crypto.SHA256.New(),
}
}
// Digest returns the current digest for this resumable digester.
func (d resumableDigester) Digest() Digest {
return NewDigest(d.alg, d.ResumableHash)
}

View file

@ -0,0 +1,21 @@
// +build !noresumabledigest
package digest
import (
"testing"
"github.com/stevvooe/resumable"
_ "github.com/stevvooe/resumable/sha256"
)
// TestResumableDetection just ensures that the resumable capability of a hash
// is exposed through the digester type, which is just a hash plus a Digest
// method.
func TestResumableDetection(t *testing.T) {
d := NewCanonicalDigester()
if _, ok := d.Hash().(resumable.Hash); !ok {
t.Fatalf("expected digester to implement resumable: %#v, %v", d, d.Hash())
}
}

View file

@ -322,7 +322,7 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
dgstr := digest.NewCanonicalDigester() dgstr := digest.NewCanonicalDigester()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr)) n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr.Hash()))
if err != nil { if err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }

View file

@ -214,7 +214,7 @@ func TestBlobAPI(t *testing.T) {
layerFile.Seek(0, 0) layerFile.Seek(0, 0)
canonicalDigester := digest.NewCanonicalDigester() canonicalDigester := digest.NewCanonicalDigester()
if _, err := io.Copy(canonicalDigester, layerFile); err != nil { if _, err := io.Copy(canonicalDigester.Hash(), layerFile); err != nil {
t.Fatalf("error copying to digest: %v", err) t.Fatalf("error copying to digest: %v", err)
} }
canonicalDigest := canonicalDigester.Digest() canonicalDigest := canonicalDigester.Digest()
@ -639,7 +639,7 @@ func doPushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Diges
func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string { func pushLayer(t *testing.T, ub *v2.URLBuilder, name string, dgst digest.Digest, uploadURLBase string, body io.Reader) string {
digester := digest.NewCanonicalDigester() digester := digest.NewCanonicalDigester()
resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, &digester)) resp, err := doPushLayer(t, ub, name, dgst, uploadURLBase, io.TeeReader(body, digester.Hash()))
if err != nil { if err != nil {
t.Fatalf("unexpected error doing push layer request: %v", err) t.Fatalf("unexpected error doing push layer request: %v", err)
} }
@ -704,7 +704,7 @@ func doPushChunk(t *testing.T, uploadURLBase string, body io.Reader) (*http.Resp
digester := digest.NewCanonicalDigester() digester := digest.NewCanonicalDigester()
req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester)) req, err := http.NewRequest("PATCH", uploadURL, io.TeeReader(body, digester.Hash()))
if err != nil { if err != nil {
t.Fatalf("unexpected error creating new request: %v", err) t.Fatalf("unexpected error creating new request: %v", err)
} }

View file

@ -3,9 +3,6 @@ package storage
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"path"
"strconv"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
@ -15,6 +12,10 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
var (
errResumableDigestNotAvailable = fmt.Errorf("resumable digest not available")
)
// layerWriter is used to control the various aspects of resumable // layerWriter is used to control the various aspects of resumable
// layer upload. It implements the LayerUpload interface. // layer upload. It implements the LayerUpload interface.
type blobWriter struct { type blobWriter struct {
@ -22,7 +23,8 @@ type blobWriter struct {
id string id string
startedAt time.Time startedAt time.Time
resumableDigester digest.ResumableDigester digester digest.Digester
written int64 // track the contiguous write
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
// LayerUpload Interface // LayerUpload Interface
@ -82,33 +84,31 @@ func (bw *blobWriter) Cancel(ctx context.Context) error {
} }
func (bw *blobWriter) Write(p []byte) (int, error) { func (bw *blobWriter) Write(p []byte) (int, error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.Write(p)
}
// Ensure that the current write offset matches how many bytes have been // Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to // written to the digester. If not, we need to update the digest state to
// match the current write position. // match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil { if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err return 0, err
} }
return io.MultiWriter(&bw.bufferedFileWriter, bw.resumableDigester).Write(p) n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
bw.written += int64(n)
return n, err
} }
func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) { func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
if bw.resumableDigester == nil {
return bw.bufferedFileWriter.ReadFrom(r)
}
// Ensure that the current write offset matches how many bytes have been // Ensure that the current write offset matches how many bytes have been
// written to the digester. If not, we need to update the digest state to // written to the digester. If not, we need to update the digest state to
// match the current write position. // match the current write position.
if err := bw.resumeHashAt(bw.blobStore.ctx, bw.offset); err != nil { if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable {
return 0, err return 0, err
} }
return bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.resumableDigester)) nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
bw.written += nn
return nn, err
} }
func (bw *blobWriter) Close() error { func (bw *blobWriter) Close() error {
@ -116,11 +116,9 @@ func (bw *blobWriter) Close() error {
return bw.err return bw.err
} }
if bw.resumableDigester != nil {
if err := bw.storeHashState(bw.blobStore.ctx); err != nil { if err := bw.storeHashState(bw.blobStore.ctx); err != nil {
return err return err
} }
}
return bw.bufferedFileWriter.Close() return bw.bufferedFileWriter.Close()
} }
@ -171,13 +169,11 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
desc.Length = bw.size desc.Length = bw.size
} }
if bw.resumableDigester != nil { // TODO(stevvooe): This section is very meandering. Need to be broken down
// Restore the hasher state to the end of the upload. // to be a lot more clear.
if err := bw.resumeHashAt(ctx, bw.size); err != nil {
return distribution.Descriptor{}, err
}
canonical = bw.resumableDigester.Digest() if err := bw.resumeDigestAt(ctx, bw.size); err == nil {
canonical = bw.digester.Digest()
if canonical.Algorithm() == desc.Digest.Algorithm() { if canonical.Algorithm() == desc.Digest.Algorithm() {
// Common case: client and server prefer the same canonical digest // Common case: client and server prefer the same canonical digest
@ -189,12 +185,27 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
// uploaded content using that digest algorithm. // uploaded content using that digest algorithm.
fullHash = true fullHash = true
} }
} else { } else if err == errResumableDigestNotAvailable {
// Not using resumable digests, so we need to hash the entire layer. // Not using resumable digests, so we need to hash the entire layer.
fullHash = true fullHash = true
} else {
return distribution.Descriptor{}, err
} }
if fullHash { if fullHash {
// a fantastic optimization: if the the written data and the size are
// the same, we don't need to read the data from the backend. This is
// because we've written the entire file in the lifecycle of the
// current instance.
if bw.written == bw.size && digest.CanonicalAlgorithm == desc.Digest.Algorithm() {
canonical = bw.digester.Digest()
verified = desc.Digest == canonical
}
// If the check based on size fails, we fall back to the slowest of
// paths. We may be able to make the size-based check a stronger
// guarantee, so this may be defensive.
if !verified {
digester := digest.NewCanonicalDigester() digester := digest.NewCanonicalDigester()
digestVerifier, err := digest.NewDigestVerifier(desc.Digest) digestVerifier, err := digest.NewDigestVerifier(desc.Digest)
@ -208,7 +219,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
} }
tr := io.TeeReader(fr, digester) tr := io.TeeReader(fr, digester.Hash())
if _, err := io.Copy(digestVerifier, tr); err != nil { if _, err := io.Copy(digestVerifier, tr); err != nil {
return distribution.Descriptor{}, err return distribution.Descriptor{}, err
@ -217,6 +228,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
canonical = digester.Digest() canonical = digester.Digest()
verified = digestVerifier.Verified() verified = digestVerifier.Verified()
} }
}
if !verified { if !verified {
context.GetLoggerWithFields(ctx, context.GetLoggerWithFields(ctx,
@ -298,172 +310,3 @@ func (bw *blobWriter) moveBlob(ctx context.Context, desc distribution.Descriptor
return bw.blobStore.driver.Move(ctx, bw.path, blobPath) return bw.blobStore.driver.Move(ctx, bw.path, blobPath)
} }
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
// resumeHashAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeHashAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
if offset == int64(bw.resumableDigester.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
bw.resumableDigester.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = bw.resumableDigester.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(bw.resumableDigester.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(bw.resumableDigester.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", bw.resumableDigester.Len(), err)
}
if _, err := io.CopyN(bw.resumableDigester, fr, gapLen); err != nil {
return err
}
}
return nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.resumableDigester.Digest().Algorithm(),
offset: int64(bw.resumableDigester.Len()),
})
if err != nil {
return err
}
hashState, err := bw.resumableDigester.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}

View file

@ -2,5 +2,16 @@
package storage package storage
func (bw *blobWriter) setupResumableDigester() { import (
"github.com/docker/distribution/context"
)
// resumeHashAt is a noop when resumable digest support is disabled.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
return errResumableDigestNotAvailable
}
// storeHashState is a noop when resumable digest support is disabled.
func (bw *blobWriter) storeHashState(ctx context.Context) error {
return errResumableDigestNotAvailable
} }

View file

@ -2,8 +2,198 @@
package storage package storage
import "github.com/docker/distribution/digest" import (
"fmt"
"io"
"os"
"path"
"strconv"
func (bw *blobWriter) setupResumableDigester() { "github.com/Sirupsen/logrus"
bw.resumableDigester = digest.NewCanonicalResumableDigester() "github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/stevvooe/resumable"
// register resumable hashes with import
_ "github.com/stevvooe/resumable/sha256"
_ "github.com/stevvooe/resumable/sha512"
)
// resumeDigestAt attempts to restore the state of the internal hash function
// by loading the most recent saved hash state less than or equal to the given
// offset. Any unhashed bytes remaining less than the given offset are hashed
// from the content uploaded so far.
func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error {
if offset < 0 {
return fmt.Errorf("cannot resume hash at negative offset: %d", offset)
}
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
if offset == int64(h.Len()) {
// State of digester is already at the requested offset.
return nil
}
// List hash states from storage backend.
var hashStateMatch hashStateEntry
hashStates, err := bw.getStoredHashStates(ctx)
if err != nil {
return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err)
}
// Find the highest stored hashState with offset less than or equal to
// the requested offset.
for _, hashState := range hashStates {
if hashState.offset == offset {
hashStateMatch = hashState
break // Found an exact offset match.
} else if hashState.offset < offset && hashState.offset > hashStateMatch.offset {
// This offset is closer to the requested offset.
hashStateMatch = hashState
} else if hashState.offset > offset {
// Remove any stored hash state with offsets higher than this one
// as writes to this resumed hasher will make those invalid. This
// is probably okay to skip for now since we don't expect anyone to
// use the API in this way. For that reason, we don't treat an
// an error here as a fatal error, but only log it.
if err := bw.driver.Delete(ctx, hashState.path); err != nil {
logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err)
}
}
}
if hashStateMatch.offset == 0 {
// No need to load any state, just reset the hasher.
h.Reset()
} else {
storedState, err := bw.driver.GetContent(ctx, hashStateMatch.path)
if err != nil {
return err
}
if err = h.Restore(storedState); err != nil {
return err
}
}
// Mind the gap.
if gapLen := offset - int64(h.Len()); gapLen > 0 {
// Need to read content from the upload to catch up to the desired offset.
fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size)
if err != nil {
return err
}
if _, err = fr.Seek(int64(h.Len()), os.SEEK_SET); err != nil {
return fmt.Errorf("unable to seek to layer reader offset %d: %s", h.Len(), err)
}
if _, err := io.CopyN(h, fr, gapLen); err != nil {
return err
}
}
return nil
}
// removeResources should clean up all resources associated with the upload
// instance. An error will be returned if the clean up cannot proceed. If the
// resources are already not present, no error will be returned.
func (bw *blobWriter) removeResources(ctx context.Context) error {
dataPath, err := bw.blobStore.pm.path(uploadDataPathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
})
if err != nil {
return err
}
// Resolve and delete the containing directory, which should include any
// upload related files.
dirPath := path.Dir(dataPath)
if err := bw.blobStore.driver.Delete(ctx, dirPath); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
break // already gone!
default:
// This should be uncommon enough such that returning an error
// should be okay. At this point, the upload should be mostly
// complete, but perhaps the backend became unaccessible.
context.GetLogger(ctx).Errorf("unable to delete layer upload resources %q: %v", dirPath, err)
return err
}
}
return nil
}
type hashStateEntry struct {
offset int64
path string
}
// getStoredHashStates returns a slice of hashStateEntries for this upload.
func (bw *blobWriter) getStoredHashStates(ctx context.Context) ([]hashStateEntry, error) {
uploadHashStatePathPrefix, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
list: true,
})
if err != nil {
return nil, err
}
paths, err := bw.blobStore.driver.List(ctx, uploadHashStatePathPrefix)
if err != nil {
if _, ok := err.(storagedriver.PathNotFoundError); !ok {
return nil, err
}
// Treat PathNotFoundError as no entries.
paths = nil
}
hashStateEntries := make([]hashStateEntry, 0, len(paths))
for _, p := range paths {
pathSuffix := path.Base(p)
// The suffix should be the offset.
offset, err := strconv.ParseInt(pathSuffix, 0, 64)
if err != nil {
logrus.Errorf("unable to parse offset from upload state path %q: %s", p, err)
}
hashStateEntries = append(hashStateEntries, hashStateEntry{offset: offset, path: p})
}
return hashStateEntries, nil
}
func (bw *blobWriter) storeHashState(ctx context.Context) error {
h, ok := bw.digester.Hash().(resumable.Hash)
if !ok {
return errResumableDigestNotAvailable
}
uploadHashStatePath, err := bw.blobStore.pm.path(uploadHashStatePathSpec{
name: bw.blobStore.repository.Name(),
id: bw.id,
alg: bw.digester.Digest().Algorithm(),
offset: int64(h.Len()),
})
if err != nil {
return err
}
hashState, err := h.State()
if err != nil {
return err
}
return bw.driver.PutContent(ctx, uploadHashStatePath, hashState)
} }

View file

@ -164,11 +164,10 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
blobStore: lbs, blobStore: lbs,
id: uuid, id: uuid,
startedAt: startedAt, startedAt: startedAt,
digester: digest.NewCanonicalDigester(),
bufferedFileWriter: *fw, bufferedFileWriter: *fw,
} }
bw.setupResumableDigester()
return bw, nil return bw, nil
} }