Handle empty blob files more appropriately

Several API tests were added to ensure correct acceptance of zero-size and
empty tar files. This led to several changes in the storage backend around the
guarantees of remote file reading, which backs the layer and layer upload type.

In support of these changes, zero-length and empty checks have been added to
the digest package. These provide a sanity check against upstream tarsum
changes. The fileReader has been modified to be more robust when reading and
seeking on zero-length or non-existent files. The file no longer needs to exist
for the reader to be created. Seeks can now move beyond the end of the file,
causing reads to issue an io.EOF. This eliminates errors during certain race
conditions for reading files which should be detected by stat calls. As a part
of this, a few error types were factored out and the read buffer size was
increased to something more reasonable.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2015-02-02 13:01:49 -08:00
parent 097fce3bb2
commit 0270bec916
12 changed files with 218 additions and 91 deletions

View file

@ -1332,9 +1332,9 @@ var errorDescriptors = []ErrorDescriptor{
{ {
Code: ErrorCodeNameInvalid, Code: ErrorCodeNameInvalid,
Value: "NAME_INVALID", Value: "NAME_INVALID",
Message: "manifest name did not match URI", Message: "invalid repository name",
Description: `During a manifest upload, if the name in the manifest Description: `Invalid repository name encountered either during
does not match the uri name, this error will be returned.`, manifest validation or any API operation.`,
HTTPStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound}, HTTPStatusCodes: []int{http.StatusBadRequest, http.StatusNotFound},
}, },
{ {

View file

@ -13,6 +13,11 @@ import (
"github.com/docker/docker/pkg/tarsum" "github.com/docker/docker/pkg/tarsum"
) )
const (
// DigestTarSumV1EmptyTar is the digest for the empty tar file.
DigestTarSumV1EmptyTar = "tarsum.v1+sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)
// Digest allows simple protection of hex formatted digest strings, prefixed // Digest allows simple protection of hex formatted digest strings, prefixed
// by their algorithm. Strings of type Digest have some guarantee of being in // by their algorithm. Strings of type Digest have some guarantee of being in
// the correct format and it provides quick access to the components of a // the correct format and it provides quick access to the components of a

View file

@ -1,6 +1,10 @@
package digest package digest
import "testing" import (
"bytes"
"io"
"testing"
)
func TestParseDigest(t *testing.T) { func TestParseDigest(t *testing.T) {
for _, testcase := range []struct { for _, testcase := range []struct {
@ -78,3 +82,25 @@ func TestParseDigest(t *testing.T) {
} }
} }
} }
// A few test cases used to fix behavior we expect in storage backend.
func TestFromTarArchiveZeroLength(t *testing.T) {
checkTarsumDigest(t, "zero-length archive", bytes.NewReader([]byte{}), DigestTarSumV1EmptyTar)
}
func TestFromTarArchiveEmptyTar(t *testing.T) {
// String of 1024 zeros is a valid, empty tar file.
checkTarsumDigest(t, "1024 zero bytes", bytes.NewReader(bytes.Repeat([]byte("\x00"), 1024)), DigestTarSumV1EmptyTar)
}
func checkTarsumDigest(t *testing.T, msg string, rd io.Reader, expected Digest) {
dgst, err := FromTarArchive(rd)
if err != nil {
t.Fatalf("unexpected error digesting %s: %v", msg, err)
}
if dgst != expected {
t.Fatalf("unexpected digest for %s: %q != %q", msg, dgst, expected)
}
}

View file

@ -144,7 +144,7 @@ func TestLayerAPI(t *testing.T) {
checkResponse(t, "status of deleted upload", resp, http.StatusNotFound) checkResponse(t, "status of deleted upload", resp, http.StatusNotFound)
// ----------------------------------------- // -----------------------------------------
// Do layer push with an empty body // Do layer push with an empty body and different digest
uploadURLBase = startPushLayer(t, builder, imageName) uploadURLBase = startPushLayer(t, builder, imageName)
resp, err = doPushLayer(t, builder, imageName, layerDigest, uploadURLBase, bytes.NewReader([]byte{})) resp, err = doPushLayer(t, builder, imageName, layerDigest, uploadURLBase, bytes.NewReader([]byte{}))
if err != nil { if err != nil {
@ -152,21 +152,30 @@ func TestLayerAPI(t *testing.T) {
} }
checkResponse(t, "bad layer push", resp, http.StatusBadRequest) checkResponse(t, "bad layer push", resp, http.StatusBadRequest)
checkBodyHasErrorCodes(t, "bad layer push", resp, v2.ErrorCodeBlobUploadInvalid) checkBodyHasErrorCodes(t, "bad layer push", resp, v2.ErrorCodeDigestInvalid)
// ----------------------------------------- // -----------------------------------------
// Do layer push with an invalid body // Do layer push with an empty body and correct digest
zeroDigest, err := digest.FromTarArchive(bytes.NewReader([]byte{}))
// This is a valid but empty tarfile!
badTar := bytes.Repeat([]byte("\x00"), 1024)
uploadURLBase = startPushLayer(t, builder, imageName)
resp, err = doPushLayer(t, builder, imageName, layerDigest, uploadURLBase, bytes.NewReader(badTar))
if err != nil { if err != nil {
t.Fatalf("unexpected error doing bad layer push: %v", err) t.Fatalf("unexpected error digesting empty buffer: %v", err)
} }
checkResponse(t, "bad layer push", resp, http.StatusBadRequest) uploadURLBase = startPushLayer(t, builder, imageName)
checkBodyHasErrorCodes(t, "bad layer push", resp, v2.ErrorCodeDigestInvalid) pushLayer(t, builder, imageName, zeroDigest, uploadURLBase, bytes.NewReader([]byte{}))
// -----------------------------------------
// Do layer push with an empty body and correct digest
// This is a valid but empty tarfile!
emptyTar := bytes.Repeat([]byte("\x00"), 1024)
emptyDigest, err := digest.FromTarArchive(bytes.NewReader(emptyTar))
if err != nil {
t.Fatalf("unexpected error digesting empty tar: %v", err)
}
uploadURLBase = startPushLayer(t, builder, imageName)
pushLayer(t, builder, imageName, emptyDigest, uploadURLBase, bytes.NewReader(emptyTar))
// ------------------------------------------ // ------------------------------------------
// Now, actually do successful upload. // Now, actually do successful upload.
@ -517,7 +526,7 @@ func checkBodyHasErrorCodes(t *testing.T, msg string, resp *http.Response, error
for _, err := range errs.Errors { for _, err := range errs.Errors {
if _, ok := expected[err.Code]; !ok { if _, ok := expected[err.Code]; !ok {
t.Fatalf("unexpected error code %v encountered: %s ", err.Code, string(p)) t.Fatalf("unexpected error code %v encountered during %s: %s ", err.Code, msg, string(p))
} }
counts[err.Code]++ counts[err.Code]++
} }
@ -525,7 +534,7 @@ func checkBodyHasErrorCodes(t *testing.T, msg string, resp *http.Response, error
// Ensure that counts of expected errors were all non-zero // Ensure that counts of expected errors were all non-zero
for code := range expected { for code := range expected {
if counts[code] == 0 { if counts[code] == 0 {
t.Fatalf("expected error code %v not encounterd: %s", code, string(p)) t.Fatalf("expected error code %v not encounterd during %s: %s", code, msg, string(p))
} }
} }

View file

@ -198,13 +198,6 @@ func (luh *layerUploadHandler) PutLayerUploadComplete(w http.ResponseWriter, r *
layer, err := luh.Upload.Finish(dgst) layer, err := luh.Upload.Finish(dgst)
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
case storage.ErrLayerUploadUnavailable:
w.WriteHeader(http.StatusBadRequest)
// TODO(stevvooe): Arguably, we may want to add an error code to
// cover this condition. It is not always a client error but it
// may be. For now, we effectively throw out the upload and have
// them start over.
luh.Errors.Push(v2.ErrorCodeBlobUploadInvalid, err.Err)
case storage.ErrLayerInvalidDigest: case storage.ErrLayerInvalidDigest:
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
luh.Errors.Push(v2.ErrorCodeDigestInvalid, err) luh.Errors.Push(v2.ErrorCodeDigestInvalid, err)

View file

@ -2,14 +2,23 @@ package storage
import ( import (
"bufio" "bufio"
"bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"time" "time"
"github.com/docker/distribution/storagedriver" "github.com/docker/distribution/storagedriver"
) )
// TODO(stevvooe): Set an optimal buffer size here. We'll have to
// understand the latency characteristics of the underlying network to
// set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for
// local communication.
const fileReaderBufferSize = 4 << 20
// remoteFileReader provides a read seeker interface to files stored in // remoteFileReader provides a read seeker interface to files stored in
// storagedriver. Used to implement part of layer interface and will be used // storagedriver. Used to implement part of layer interface and will be used
// to implement read side of LayerUpload. // to implement read side of LayerUpload.
@ -28,24 +37,40 @@ type fileReader struct {
err error // terminal error, if set, reader is closed err error // terminal error, if set, reader is closed
} }
// newFileReader initializes a file reader for the remote file. The read takes
// on the offset and size at the time the reader is created. If the underlying
// file changes, one must create a new fileReader.
func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) { func newFileReader(driver storagedriver.StorageDriver, path string) (*fileReader, error) {
rd := &fileReader{
driver: driver,
path: path,
}
// Grab the size of the layer file, ensuring existence. // Grab the size of the layer file, ensuring existence.
fi, err := driver.Stat(path) if fi, err := driver.Stat(path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): We really don't care if the file is not
// actually present for the reader. If the caller needs to know
// whether or not the file exists, they should issue a stat call
// on the path. There is still no guarantee, since the file may be
// gone by the time the reader is created. The only correct
// behavior is to return a reader that immediately returns EOF.
default:
// Any other error we want propagated up the stack.
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot read a directory")
}
if err != nil { // Fill in file information
return nil, err rd.size = fi.Size()
rd.modtime = fi.ModTime()
} }
if fi.IsDir() { return rd, nil
return nil, fmt.Errorf("cannot read a directory")
}
return &fileReader{
driver: driver,
path: path,
size: fi.Size(),
modtime: fi.ModTime(),
}, nil
} }
func (fr *fileReader) Read(p []byte) (n int, err error) { func (fr *fileReader) Read(p []byte) (n int, err error) {
@ -88,8 +113,6 @@ func (fr *fileReader) Seek(offset int64, whence int) (int64, error) {
if newOffset < 0 { if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position") err = fmt.Errorf("cannot seek to negative position")
} else if newOffset > fr.size {
err = fmt.Errorf("cannot seek passed end of file")
} else { } else {
if fr.offset != newOffset { if fr.offset != newOffset {
fr.reset() fr.reset()
@ -134,9 +157,17 @@ func (fr *fileReader) reader() (io.Reader, error) {
// If we don't have a reader, open one up. // If we don't have a reader, open one up.
rc, err := fr.driver.ReadStream(fr.path, fr.offset) rc, err := fr.driver.ReadStream(fr.path, fr.offset)
if err != nil { if err != nil {
return nil, err switch err := err.(type) {
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): If the path is not found, we simply return a
// reader that returns io.EOF. However, we do not set fr.rc,
// allowing future attempts at getting a reader to possibly
// succeed if the file turns up later.
return ioutil.NopCloser(bytes.NewReader([]byte{})), nil
default:
return nil, err
}
} }
fr.rc = rc fr.rc = rc
@ -147,7 +178,7 @@ func (fr *fileReader) reader() (io.Reader, error) {
// set this correctly, so we may want to leave it to the driver. For // set this correctly, so we may want to leave it to the driver. For
// out of process drivers, we'll have to optimize this buffer size for // out of process drivers, we'll have to optimize this buffer size for
// local communication. // local communication.
fr.brd = bufio.NewReader(fr.rc) fr.brd = bufio.NewReaderSize(fr.rc, fileReaderBufferSize)
} else { } else {
fr.brd.Reset(fr.rc) fr.brd.Reset(fr.rc)
} }

View file

@ -124,7 +124,7 @@ func TestFileReaderSeek(t *testing.T) {
t.Fatalf("expected to seek to end: %v != %v", end, len(content)) t.Fatalf("expected to seek to end: %v != %v", end, len(content))
} }
// 4. Seek past end and before start, ensure error. // 4. Seek before start, ensure error.
// seek before start // seek before start
before, err := fr.Seek(-1, os.SEEK_SET) before, err := fr.Seek(-1, os.SEEK_SET)
@ -132,9 +132,44 @@ func TestFileReaderSeek(t *testing.T) {
t.Fatalf("error expected, returned offset=%v", before) t.Fatalf("error expected, returned offset=%v", before)
} }
after, err := fr.Seek(int64(len(content)+1), os.SEEK_END) // 5. Seek after end,
if err == nil { after, err := fr.Seek(1, os.SEEK_END)
t.Fatalf("error expected, returned offset=%v", after) if err != nil {
t.Fatalf("unexpected error expected, returned offset=%v", after)
}
p := make([]byte, 16)
n, err := fr.Read(p)
if n != 0 {
t.Fatalf("bytes reads %d != %d", n, 0)
}
if err != io.EOF {
t.Fatalf("expected io.EOF, got %v", err)
}
}
// TestFileReaderNonExistentFile ensures the reader behaves as expected with a
// missing or zero-length remote file. While the file may not exist, the
// reader should not error out on creation and should return 0-bytes from the
// read method, with an io.EOF error.
func TestFileReaderNonExistentFile(t *testing.T) {
driver := inmemory.New()
fr, err := newFileReader(driver, "/doesnotexist")
if err != nil {
t.Fatalf("unexpected error initializing reader: %v", err)
}
var buf [1024]byte
n, err := fr.Read(buf[:])
if n != 0 {
t.Fatalf("non-zero byte read reported: %d != 0", n)
}
if err != io.EOF {
t.Fatalf("read on missing file should return io.EOF, got %v", err)
} }
} }

View file

@ -99,9 +99,6 @@ func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if newOffset < 0 { if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position") err = fmt.Errorf("cannot seek to negative position")
} else if newOffset > fw.size {
fw.offset = newOffset
fw.size = newOffset
} else { } else {
// No problems, set the offset. // No problems, set the offset.
fw.offset = newOffset fw.offset = newOffset

View file

@ -80,28 +80,10 @@ func (err ErrUnknownLayer) Error() string {
// ErrLayerInvalidDigest returned when tarsum check fails. // ErrLayerInvalidDigest returned when tarsum check fails.
type ErrLayerInvalidDigest struct { type ErrLayerInvalidDigest struct {
Digest digest.Digest Digest digest.Digest
Reason error
} }
func (err ErrLayerInvalidDigest) Error() string { func (err ErrLayerInvalidDigest) Error() string {
return fmt.Sprintf("invalid digest for referenced layer: %v", err.Digest) return fmt.Sprintf("invalid digest for referenced layer: %v, %v",
} err.Digest, err.Reason)
// ErrLayerInvalidSize returned when length check fails.
type ErrLayerInvalidSize struct {
Size int64
}
func (err ErrLayerInvalidSize) Error() string {
return fmt.Sprintf("invalid layer size: %d", err.Size)
}
// ErrLayerUploadUnavailable signals missing upload data, either when no data
// has been received or when the backend reports the data as missing. This is
// different from ErrLayerUploadUnknown.
type ErrLayerUploadUnavailable struct {
Err error
}
func (err ErrLayerUploadUnavailable) Error() string {
return fmt.Sprintf("layer upload unavialable: %v", err)
} }

View file

@ -235,6 +235,40 @@ func TestSimpleLayerRead(t *testing.T) {
} }
} }
// TestLayerUploadZeroLength uploads zero-length
func TestLayerUploadZeroLength(t *testing.T) {
imageName := "foo/bar"
driver := inmemory.New()
registry := NewRegistryWithDriver(driver)
ls := registry.Repository(imageName).Layers()
upload, err := ls.Upload()
if err != nil {
t.Fatalf("unexpected error starting upload: %v", err)
}
io.Copy(upload, bytes.NewReader([]byte{}))
dgst, err := digest.FromTarArchive(bytes.NewReader([]byte{}))
if err != nil {
t.Fatalf("error getting zero digest: %v", err)
}
if dgst != digest.DigestTarSumV1EmptyTar {
// sanity check on zero digest
t.Fatalf("digest not as expected: %v != %v", dgst, digest.DigestTarSumV1EmptyTar)
}
layer, err := upload.Finish(dgst)
if err != nil {
t.Fatalf("unexpected error finishing upload: %v", err)
}
if layer.Digest() != dgst {
t.Fatalf("unexpected digest: %v != %v", layer.Digest(), dgst)
}
}
// writeRandomLayer creates a random layer under name and tarSum using driver // writeRandomLayer creates a random layer under name and tarSum using driver
// and pathMapper. An io.ReadSeeker with the data is returned, along with the // and pathMapper. An io.ReadSeeker with the data is returned, along with the
// sha256 hex digest. // sha256 hex digest.

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"fmt"
"io" "io"
"path" "path"
"time" "time"
@ -89,7 +90,10 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
case tarsum.Version1: case tarsum.Version1:
default: default:
// version 0 and dev, for now. // version 0 and dev, for now.
return "", ErrLayerTarSumVersionUnsupported return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: ErrLayerTarSumVersionUnsupported,
}
} }
digestVerifier := digest.NewDigestVerifier(dgst) digestVerifier := digest.NewDigestVerifier(dgst)
@ -102,22 +106,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// Read the file from the backend driver and validate it. // Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.fileWriter.driver, luc.path) fr, err := newFileReader(luc.fileWriter.driver, luc.path)
if err != nil { if err != nil {
switch err := err.(type) { return "", err
case storagedriver.PathNotFoundError:
// NOTE(stevvooe): Path not found can mean several things by we
// should report the upload is not available. This can happen if
// the following happens:
//
// 1. If not data was received for the upload instance.
// 2. Backend storage driver has not convereged after receiving latest data.
//
// This *does not* mean that the upload does not exist, since we
// can't even get a LayerUpload object without having the
// directory exist.
return "", ErrLayerUploadUnavailable{Err: err}
default:
return "", err
}
} }
tr := io.TeeReader(fr, digestVerifier) tr := io.TeeReader(fr, digestVerifier)
@ -132,7 +121,10 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
} }
if !digestVerifier.Verified() { if !digestVerifier.Verified() {
return "", ErrLayerInvalidDigest{Digest: dgst} return "", ErrLayerInvalidDigest{
Digest: dgst,
Reason: fmt.Errorf("content does not match digest"),
}
} }
return canonical, nil return canonical, nil
@ -151,7 +143,7 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
} }
// Check for existence // Check for existence
if _, err := luc.layerStore.repository.registry.driver.Stat(blobPath); err != nil { if _, err := luc.driver.Stat(blobPath); err != nil {
switch err := err.(type) { switch err := err.(type) {
case storagedriver.PathNotFoundError: case storagedriver.PathNotFoundError:
break // ensure that it doesn't exist. break // ensure that it doesn't exist.
@ -166,6 +158,31 @@ func (luc *layerUploadController) moveLayer(dgst digest.Digest) error {
return nil return nil
} }
// If no data was received, we may not actually have a file on disk. Check
// the size here and write a zero-length file to blobPath if this is the
// case. For the most part, this should only ever happen with zero-length
// tars.
if _, err := luc.driver.Stat(luc.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// HACK(stevvooe): This is slightly dangerous: if we verify above,
// get a hash, then the underlying file is deleted, we risk moving
// a zero-length blob into a nonzero-length blob location. To
// prevent this horrid thing, we employ the hack of only allowing
// to this happen for the zero tarsum.
if dgst == digest.DigestTarSumV1EmptyTar {
return luc.driver.PutContent(blobPath, []byte{})
}
// We let this fail during the move below.
logrus.
WithField("upload.uuid", luc.UUID()).
WithField("digest", dgst).Warnf("attempted to move zero-length content with non-zero digest")
default:
return err // unrelated error
}
}
return luc.driver.Move(luc.path, blobPath) return luc.driver.Move(luc.path, blobPath)
} }

View file

@ -57,8 +57,6 @@ func (rs *revisionStore) get(revision digest.Digest) (*manifest.SignedManifest,
return nil, err return nil, err
} }
logrus.Infof("retrieved signatures: %v", string(signatures[0]))
jsig, err := libtrust.NewJSONSignature(content, signatures...) jsig, err := libtrust.NewJSONSignature(content, signatures...)
if err != nil { if err != nil {
return nil, err return nil, err