From c69c8a3286c98d9f072c4c8a4e2eb2fffffaf2ab Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 8 Feb 2016 14:29:21 -0800 Subject: [PATCH] Adds new storagedriver.FileWriter interface Updates registry storage code to use this for better resumable writes. Implements this interface for the following drivers: + Inmemory + Filesystem + S3 + Azure Signed-off-by: Brian Bland --- docs/client/blob_writer.go | 18 +- docs/handlers/blobupload.go | 49 +- docs/storage/blob_test.go | 11 +- docs/storage/blobwriter.go | 49 +- docs/storage/blobwriter_resumable.go | 45 +- docs/storage/driver/azure/azure.go | 164 ++++- docs/storage/driver/azure/blockblob.go | 24 - docs/storage/driver/azure/blockblob_test.go | 155 ---- docs/storage/driver/azure/blockid.go | 60 -- docs/storage/driver/azure/blockid_test.go | 74 -- docs/storage/driver/azure/randomwriter.go | 208 ------ .../storage/driver/azure/randomwriter_test.go | 339 --------- docs/storage/driver/azure/zerofillwriter.go | 49 -- .../driver/azure/zerofillwriter_test.go | 126 ---- docs/storage/driver/base/base.go | 24 +- docs/storage/driver/filesystem/driver.go | 146 +++- docs/storage/driver/inmemory/driver.go | 126 +++- docs/storage/driver/s3-aws/s3.go | 670 ++++++++---------- docs/storage/driver/s3-goamz/s3.go | 549 ++++++-------- docs/storage/driver/storagedriver.go | 30 +- docs/storage/driver/testsuites/testsuites.go | 247 +++---- docs/storage/filereader.go | 2 +- docs/storage/filewriter.go | 135 ---- docs/storage/filewriter_test.go | 226 ------ docs/storage/linkedblobstore.go | 21 +- 25 files changed, 1059 insertions(+), 2488 deletions(-) delete mode 100644 docs/storage/driver/azure/blockblob.go delete mode 100644 docs/storage/driver/azure/blockblob_test.go delete mode 100644 docs/storage/driver/azure/blockid.go delete mode 100644 docs/storage/driver/azure/blockid_test.go delete mode 100644 docs/storage/driver/azure/randomwriter.go delete mode 100644 docs/storage/driver/azure/randomwriter_test.go delete mode 100644 docs/storage/driver/azure/zerofillwriter.go delete mode 100644 docs/storage/driver/azure/zerofillwriter_test.go delete mode 100644 docs/storage/filewriter.go delete mode 100644 docs/storage/filewriter_test.go diff --git a/docs/client/blob_writer.go b/docs/client/blob_writer.go index 21a018dc3..e3ffcb00f 100644 --- a/docs/client/blob_writer.go +++ b/docs/client/blob_writer.go @@ -6,7 +6,6 @@ import ( "io" "io/ioutil" "net/http" - "os" "time" "github.com/docker/distribution" @@ -104,21 +103,8 @@ func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) { } -func (hbu *httpBlobUpload) Seek(offset int64, whence int) (int64, error) { - newOffset := hbu.offset - - switch whence { - case os.SEEK_CUR: - newOffset += int64(offset) - case os.SEEK_END: - newOffset += int64(offset) - case os.SEEK_SET: - newOffset = int64(offset) - } - - hbu.offset = newOffset - - return hbu.offset, nil +func (hbu *httpBlobUpload) Size() int64 { + return hbu.offset } func (hbu *httpBlobUpload) ID() string { diff --git a/docs/handlers/blobupload.go b/docs/handlers/blobupload.go index e9f0f5133..892393aaf 100644 --- a/docs/handlers/blobupload.go +++ b/docs/handlers/blobupload.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" "net/url" - "os" "github.com/docker/distribution" ctxu "github.com/docker/distribution/context" @@ -76,28 +75,14 @@ func blobUploadDispatcher(ctx *Context, r *http.Request) http.Handler { } buh.Upload = upload - if state.Offset > 0 { - // Seek the blob upload to the correct spot if it's non-zero. - // These error conditions should be rare and demonstrate really - // problems. We basically cancel the upload and tell the client to - // start over. - if nn, err := upload.Seek(buh.State.Offset, os.SEEK_SET); err != nil { - defer upload.Close() - ctxu.GetLogger(ctx).Infof("error seeking blob upload: %v", err) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err)) - upload.Cancel(buh) - }) - } else if nn != buh.State.Offset { - defer upload.Close() - ctxu.GetLogger(ctx).Infof("seek to wrong offest: %d != %d", nn, buh.State.Offset) - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err)) - upload.Cancel(buh) - }) - } + if size := upload.Size(); size != buh.State.Offset { + defer upload.Close() + ctxu.GetLogger(ctx).Infof("upload resumed at wrong offest: %d != %d", size, buh.State.Offset) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buh.Errors = append(buh.Errors, v2.ErrorCodeBlobUploadInvalid.WithDetail(err)) + upload.Cancel(buh) + }) } - return closeResources(handler, buh.Upload) } @@ -239,10 +224,7 @@ func (buh *blobUploadHandler) PutBlobUploadComplete(w http.ResponseWriter, r *ht return } - size := buh.State.Offset - if offset, err := buh.Upload.Seek(0, os.SEEK_CUR); err == nil { - size = offset - } + size := buh.Upload.Size() desc, err := buh.Upload.Commit(buh, distribution.Descriptor{ Digest: dgst, @@ -308,21 +290,10 @@ func (buh *blobUploadHandler) CancelBlobUpload(w http.ResponseWriter, r *http.Re // uploads always start at a 0 offset. This allows disabling resumable push by // always returning a 0 offset on check status. func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http.Request, fresh bool) error { - - var offset int64 - if !fresh { - var err error - offset, err = buh.Upload.Seek(0, os.SEEK_CUR) - if err != nil { - ctxu.GetLogger(buh).Errorf("unable get current offset of blob upload: %v", err) - return err - } - } - // TODO(stevvooe): Need a better way to manage the upload state automatically. buh.State.Name = buh.Repository.Named().Name() buh.State.UUID = buh.Upload.ID() - buh.State.Offset = offset + buh.State.Offset = buh.Upload.Size() buh.State.StartedAt = buh.Upload.StartedAt() token, err := hmacKey(buh.Config.HTTP.Secret).packUploadState(buh.State) @@ -341,7 +312,7 @@ func (buh *blobUploadHandler) blobUploadResponse(w http.ResponseWriter, r *http. return err } - endRange := offset + endRange := buh.Upload.Size() if endRange > 0 { endRange = endRange - 1 } diff --git a/docs/storage/blob_test.go b/docs/storage/blob_test.go index 1e5b408c9..3698a415d 100644 --- a/docs/storage/blob_test.go +++ b/docs/storage/blob_test.go @@ -41,10 +41,7 @@ func TestWriteSeek(t *testing.T) { } contents := []byte{1, 2, 3} blobUpload.Write(contents) - offset, err := blobUpload.Seek(0, os.SEEK_CUR) - if err != nil { - t.Fatalf("unexpected error in blobUpload.Seek: %s", err) - } + offset := blobUpload.Size() if offset != int64(len(contents)) { t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents)) } @@ -113,11 +110,7 @@ func TestSimpleBlobUpload(t *testing.T) { t.Fatalf("layer data write incomplete") } - offset, err := blobUpload.Seek(0, os.SEEK_CUR) - if err != nil { - t.Fatalf("unexpected error seeking layer upload: %v", err) - } - + offset := blobUpload.Size() if offset != nn { t.Fatalf("blobUpload not updated with correct offset: %v != %v", offset, nn) } diff --git a/docs/storage/blobwriter.go b/docs/storage/blobwriter.go index f2ca7388d..7f280d366 100644 --- a/docs/storage/blobwriter.go +++ b/docs/storage/blobwriter.go @@ -21,6 +21,7 @@ var ( // layerWriter is used to control the various aspects of resumable // layer upload. It implements the LayerUpload interface. type blobWriter struct { + ctx context.Context blobStore *linkedBlobStore id string @@ -28,9 +29,9 @@ type blobWriter struct { digester digest.Digester written int64 // track the contiguous write - // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy - // LayerUpload Interface - fileWriter + fileWriter storagedriver.FileWriter + driver storagedriver.StorageDriver + path string resumableDigestEnabled bool } @@ -51,7 +52,7 @@ func (bw *blobWriter) StartedAt() time.Time { func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { context.GetLogger(ctx).Debug("(*blobWriter).Commit") - if err := bw.fileWriter.Close(); err != nil { + if err := bw.fileWriter.Commit(); err != nil { return distribution.Descriptor{}, err } @@ -84,6 +85,10 @@ func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) // the writer and canceling the operation. func (bw *blobWriter) Cancel(ctx context.Context) error { context.GetLogger(ctx).Debug("(*blobWriter).Rollback") + if err := bw.fileWriter.Cancel(); err != nil { + return err + } + if err := bw.removeResources(ctx); err != nil { return err } @@ -92,15 +97,19 @@ func (bw *blobWriter) Cancel(ctx context.Context) error { return nil } +func (bw *blobWriter) Size() int64 { + return bw.fileWriter.Size() +} + func (bw *blobWriter) Write(p []byte) (int, error) { // Ensure that the current write offset matches how many bytes have been // written to the digester. If not, we need to update the digest state to // match the current write position. - if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable { + if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable { return 0, err } - n, err := io.MultiWriter(&bw.fileWriter, bw.digester.Hash()).Write(p) + n, err := io.MultiWriter(bw.fileWriter, bw.digester.Hash()).Write(p) bw.written += int64(n) return n, err @@ -110,21 +119,17 @@ func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) { // Ensure that the current write offset matches how many bytes have been // written to the digester. If not, we need to update the digest state to // match the current write position. - if err := bw.resumeDigestAt(bw.blobStore.ctx, bw.offset); err != nil && err != errResumableDigestNotAvailable { + if err := bw.resumeDigest(bw.blobStore.ctx); err != nil && err != errResumableDigestNotAvailable { return 0, err } - nn, err := bw.fileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash())) + nn, err := io.Copy(io.MultiWriter(bw.fileWriter, bw.digester.Hash()), r) bw.written += nn return nn, err } func (bw *blobWriter) Close() error { - if bw.err != nil { - return bw.err - } - if err := bw.storeHashState(bw.blobStore.ctx); err != nil { return err } @@ -148,8 +153,10 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri } } + var size int64 + // Stat the on disk file - if fi, err := bw.fileWriter.driver.Stat(ctx, bw.path); err != nil { + if fi, err := bw.driver.Stat(ctx, bw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // NOTE(stevvooe): We really don't care if the file is @@ -165,23 +172,23 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri return distribution.Descriptor{}, fmt.Errorf("unexpected directory at upload location %q", bw.path) } - bw.size = fi.Size() + size = fi.Size() } if desc.Size > 0 { - if desc.Size != bw.size { + if desc.Size != size { return distribution.Descriptor{}, distribution.ErrBlobInvalidLength } } else { // if provided 0 or negative length, we can assume caller doesn't know or // care about length. - desc.Size = bw.size + desc.Size = size } // TODO(stevvooe): This section is very meandering. Need to be broken down // to be a lot more clear. - if err := bw.resumeDigestAt(ctx, bw.size); err == nil { + if err := bw.resumeDigest(ctx); err == nil { canonical = bw.digester.Digest() if canonical.Algorithm() == desc.Digest.Algorithm() { @@ -206,7 +213,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri // the same, we don't need to read the data from the backend. This is // because we've written the entire file in the lifecycle of the // current instance. - if bw.written == bw.size && digest.Canonical == desc.Digest.Algorithm() { + if bw.written == size && digest.Canonical == desc.Digest.Algorithm() { canonical = bw.digester.Digest() verified = desc.Digest == canonical } @@ -223,7 +230,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri } // Read the file from the backend driver and validate it. - fr, err := newFileReader(ctx, bw.fileWriter.driver, bw.path, desc.Size) + fr, err := newFileReader(ctx, bw.driver, bw.path, desc.Size) if err != nil { return distribution.Descriptor{}, err } @@ -357,7 +364,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) { // todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4 try := 1 for try <= 5 { - _, err := bw.fileWriter.driver.Stat(bw.ctx, bw.path) + _, err := bw.driver.Stat(bw.ctx, bw.path) if err == nil { break } @@ -371,7 +378,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) { } } - readCloser, err := bw.fileWriter.driver.ReadStream(bw.ctx, bw.path, 0) + readCloser, err := bw.driver.Reader(bw.ctx, bw.path, 0) if err != nil { return nil, err } diff --git a/docs/storage/blobwriter_resumable.go b/docs/storage/blobwriter_resumable.go index 5ae29c54e..ff5482c3f 100644 --- a/docs/storage/blobwriter_resumable.go +++ b/docs/storage/blobwriter_resumable.go @@ -4,8 +4,6 @@ package storage import ( "fmt" - "io" - "os" "path" "strconv" @@ -19,24 +17,18 @@ import ( _ "github.com/stevvooe/resumable/sha512" ) -// resumeDigestAt attempts to restore the state of the internal hash function -// by loading the most recent saved hash state less than or equal to the given -// offset. Any unhashed bytes remaining less than the given offset are hashed -// from the content uploaded so far. -func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { +// resumeDigest attempts to restore the state of the internal hash function +// by loading the most recent saved hash state equal to the current size of the blob. +func (bw *blobWriter) resumeDigest(ctx context.Context) error { if !bw.resumableDigestEnabled { return errResumableDigestNotAvailable } - if offset < 0 { - return fmt.Errorf("cannot resume hash at negative offset: %d", offset) - } - h, ok := bw.digester.Hash().(resumable.Hash) if !ok { return errResumableDigestNotAvailable } - + offset := bw.fileWriter.Size() if offset == int64(h.Len()) { // State of digester is already at the requested offset. return nil @@ -49,24 +41,12 @@ func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { return fmt.Errorf("unable to get stored hash states with offset %d: %s", offset, err) } - // Find the highest stored hashState with offset less than or equal to + // Find the highest stored hashState with offset equal to // the requested offset. for _, hashState := range hashStates { if hashState.offset == offset { hashStateMatch = hashState break // Found an exact offset match. - } else if hashState.offset < offset && hashState.offset > hashStateMatch.offset { - // This offset is closer to the requested offset. - hashStateMatch = hashState - } else if hashState.offset > offset { - // Remove any stored hash state with offsets higher than this one - // as writes to this resumed hasher will make those invalid. This - // is probably okay to skip for now since we don't expect anyone to - // use the API in this way. For that reason, we don't treat an - // an error here as a fatal error, but only log it. - if err := bw.driver.Delete(ctx, hashState.path); err != nil { - logrus.Errorf("unable to delete stale hash state %q: %s", hashState.path, err) - } } } @@ -86,20 +66,7 @@ func (bw *blobWriter) resumeDigestAt(ctx context.Context, offset int64) error { // Mind the gap. if gapLen := offset - int64(h.Len()); gapLen > 0 { - // Need to read content from the upload to catch up to the desired offset. - fr, err := newFileReader(ctx, bw.driver, bw.path, bw.size) - if err != nil { - return err - } - defer fr.Close() - - if _, err = fr.Seek(int64(h.Len()), os.SEEK_SET); err != nil { - return fmt.Errorf("unable to seek to layer reader offset %d: %s", h.Len(), err) - } - - if _, err := io.CopyN(h, fr, gapLen); err != nil { - return err - } + return errResumableDigestNotAvailable } return nil diff --git a/docs/storage/driver/azure/azure.go b/docs/storage/driver/azure/azure.go index cbb959812..70771375a 100644 --- a/docs/storage/driver/azure/azure.go +++ b/docs/storage/driver/azure/azure.go @@ -3,6 +3,7 @@ package azure import ( + "bufio" "bytes" "fmt" "io" @@ -26,6 +27,7 @@ const ( paramAccountKey = "accountkey" paramContainer = "container" paramRealm = "realm" + maxChunkSize = 4 * 1024 * 1024 ) type driver struct { @@ -117,18 +119,21 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e if _, err := d.client.DeleteBlobIfExists(d.container, path); err != nil { return err } - if err := d.client.CreateBlockBlob(d.container, path); err != nil { + writer, err := d.Writer(ctx, path, false) + if err != nil { return err } - bs := newAzureBlockStorage(d.client) - bw := newRandomBlobWriter(&bs, azure.MaxBlobBlockSize) - _, err := bw.WriteBlobAt(d.container, path, 0, bytes.NewReader(contents)) - return err + defer writer.Close() + _, err = writer.Write(contents) + if err != nil { + return err + } + return writer.Commit() } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { if ok, err := d.client.BlobExists(d.container, path); err != nil { return nil, err } else if !ok { @@ -153,25 +158,38 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return resp, nil } -// WriteStream stores the contents of the provided io.ReadCloser at a location -// designated by the given path. -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (int64, error) { - if blobExists, err := d.client.BlobExists(d.container, path); err != nil { - return 0, err - } else if !blobExists { - err := d.client.CreateBlockBlob(d.container, path) +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + blobExists, err := d.client.BlobExists(d.container, path) + if err != nil { + return nil, err + } + var size int64 + if blobExists { + if append { + blobProperties, err := d.client.GetBlobProperties(d.container, path) + if err != nil { + return nil, err + } + size = blobProperties.ContentLength + } else { + err := d.client.DeleteBlob(d.container, path) + if err != nil { + return nil, err + } + } + } else { + if append { + return nil, storagedriver.PathNotFoundError{Path: path} + } + err := d.client.PutAppendBlob(d.container, path, nil) if err != nil { - return 0, err + return nil, err } } - if offset < 0 { - return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } - bs := newAzureBlockStorage(d.client) - bw := newRandomBlobWriter(&bs, azure.MaxBlobBlockSize) - zw := newZeroFillWriter(&bw) - return zw.Write(d.container, path, offset, reader) + return d.newWriter(path, size), nil } // Stat retrieves the FileInfo for the given path, including the current size @@ -236,6 +254,9 @@ func (d *driver) List(ctx context.Context, path string) ([]string, error) { } list := directDescendants(blobs, path) + if path != "" && len(list) == 0 { + return nil, storagedriver.PathNotFoundError{Path: path} + } return list, nil } @@ -361,6 +382,101 @@ func (d *driver) listBlobs(container, virtPath string) ([]string, error) { } func is404(err error) bool { - e, ok := err.(azure.AzureStorageServiceError) - return ok && e.StatusCode == http.StatusNotFound + statusCodeErr, ok := err.(azure.UnexpectedStatusCodeError) + return ok && statusCodeErr.Got() == http.StatusNotFound +} + +type writer struct { + driver *driver + path string + size int64 + bw *bufio.Writer + closed bool + committed bool + cancelled bool +} + +func (d *driver) newWriter(path string, size int64) storagedriver.FileWriter { + return &writer{ + driver: d, + path: path, + size: size, + bw: bufio.NewWriterSize(&blockWriter{ + client: d.client, + container: d.container, + path: path, + }, maxChunkSize), + } +} + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + n, err := w.bw.Write(p) + w.size += int64(n) + return n, err +} + +func (w *writer) Size() int64 { + return w.size +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return w.bw.Flush() +} + +func (w *writer) Cancel() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + return w.driver.client.DeleteBlob(w.driver.container, w.path) +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + w.committed = true + return w.bw.Flush() +} + +type blockWriter struct { + client azure.BlobStorageClient + container string + path string +} + +func (bw *blockWriter) Write(p []byte) (int, error) { + n := 0 + for offset := 0; offset < len(p); offset += maxChunkSize { + chunkSize := maxChunkSize + if offset+chunkSize > len(p) { + chunkSize = len(p) - offset + } + err := bw.client.AppendBlock(bw.container, bw.path, p[offset:offset+chunkSize]) + if err != nil { + return n, err + } + + n += chunkSize + } + + return n, nil } diff --git a/docs/storage/driver/azure/blockblob.go b/docs/storage/driver/azure/blockblob.go deleted file mode 100644 index 1c1df899c..000000000 --- a/docs/storage/driver/azure/blockblob.go +++ /dev/null @@ -1,24 +0,0 @@ -package azure - -import ( - "fmt" - "io" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -// azureBlockStorage is adaptor between azure.BlobStorageClient and -// blockStorage interface. -type azureBlockStorage struct { - azure.BlobStorageClient -} - -func (b *azureBlockStorage) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) { - return b.BlobStorageClient.GetBlobRange(container, blob, fmt.Sprintf("%v-%v", start, start+length-1)) -} - -func newAzureBlockStorage(b azure.BlobStorageClient) azureBlockStorage { - a := azureBlockStorage{} - a.BlobStorageClient = b - return a -} diff --git a/docs/storage/driver/azure/blockblob_test.go b/docs/storage/driver/azure/blockblob_test.go deleted file mode 100644 index 7ce471957..000000000 --- a/docs/storage/driver/azure/blockblob_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package azure - -import ( - "bytes" - "fmt" - "io" - "io/ioutil" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -type StorageSimulator struct { - blobs map[string]*BlockBlob -} - -type BlockBlob struct { - blocks map[string]*DataBlock - blockList []string -} - -type DataBlock struct { - data []byte - committed bool -} - -func (s *StorageSimulator) path(container, blob string) string { - return fmt.Sprintf("%s/%s", container, blob) -} - -func (s *StorageSimulator) BlobExists(container, blob string) (bool, error) { - _, ok := s.blobs[s.path(container, blob)] - return ok, nil -} - -func (s *StorageSimulator) GetBlob(container, blob string) (io.ReadCloser, error) { - bb, ok := s.blobs[s.path(container, blob)] - if !ok { - return nil, fmt.Errorf("blob not found") - } - - var readers []io.Reader - for _, bID := range bb.blockList { - readers = append(readers, bytes.NewReader(bb.blocks[bID].data)) - } - return ioutil.NopCloser(io.MultiReader(readers...)), nil -} - -func (s *StorageSimulator) GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) { - r, err := s.GetBlob(container, blob) - if err != nil { - return nil, err - } - b, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - return ioutil.NopCloser(bytes.NewReader(b[start : start+length])), nil -} - -func (s *StorageSimulator) CreateBlockBlob(container, blob string) error { - path := s.path(container, blob) - bb := &BlockBlob{ - blocks: make(map[string]*DataBlock), - blockList: []string{}, - } - s.blobs[path] = bb - return nil -} - -func (s *StorageSimulator) PutBlock(container, blob, blockID string, chunk []byte) error { - path := s.path(container, blob) - bb, ok := s.blobs[path] - if !ok { - return fmt.Errorf("blob not found") - } - data := make([]byte, len(chunk)) - copy(data, chunk) - bb.blocks[blockID] = &DataBlock{data: data, committed: false} // add block to blob - return nil -} - -func (s *StorageSimulator) GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error) { - resp := azure.BlockListResponse{} - bb, ok := s.blobs[s.path(container, blob)] - if !ok { - return resp, fmt.Errorf("blob not found") - } - - // Iterate committed blocks (in order) - if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted { - for _, blockID := range bb.blockList { - b := bb.blocks[blockID] - block := azure.BlockResponse{ - Name: blockID, - Size: int64(len(b.data)), - } - resp.CommittedBlocks = append(resp.CommittedBlocks, block) - } - - } - - // Iterate uncommitted blocks (in no order) - if blockType == azure.BlockListTypeAll || blockType == azure.BlockListTypeCommitted { - for blockID, b := range bb.blocks { - block := azure.BlockResponse{ - Name: blockID, - Size: int64(len(b.data)), - } - if !b.committed { - resp.UncommittedBlocks = append(resp.UncommittedBlocks, block) - } - } - } - return resp, nil -} - -func (s *StorageSimulator) PutBlockList(container, blob string, blocks []azure.Block) error { - bb, ok := s.blobs[s.path(container, blob)] - if !ok { - return fmt.Errorf("blob not found") - } - - var blockIDs []string - for _, v := range blocks { - bl, ok := bb.blocks[v.ID] - if !ok { // check if block ID exists - return fmt.Errorf("Block id '%s' not found", v.ID) - } - bl.committed = true - blockIDs = append(blockIDs, v.ID) - } - - // Mark all other blocks uncommitted - for k, b := range bb.blocks { - inList := false - for _, v := range blockIDs { - if k == v { - inList = true - break - } - } - if !inList { - b.committed = false - } - } - - bb.blockList = blockIDs - return nil -} - -func NewStorageSimulator() StorageSimulator { - return StorageSimulator{ - blobs: make(map[string]*BlockBlob), - } -} diff --git a/docs/storage/driver/azure/blockid.go b/docs/storage/driver/azure/blockid.go deleted file mode 100644 index 776c7cd59..000000000 --- a/docs/storage/driver/azure/blockid.go +++ /dev/null @@ -1,60 +0,0 @@ -package azure - -import ( - "encoding/base64" - "fmt" - "math/rand" - "sync" - "time" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -type blockIDGenerator struct { - pool map[string]bool - r *rand.Rand - m sync.Mutex -} - -// Generate returns an unused random block id and adds the generated ID -// to list of used IDs so that the same block name is not used again. -func (b *blockIDGenerator) Generate() string { - b.m.Lock() - defer b.m.Unlock() - - var id string - for { - id = toBlockID(int(b.r.Int())) - if !b.exists(id) { - break - } - } - b.pool[id] = true - return id -} - -func (b *blockIDGenerator) exists(id string) bool { - _, used := b.pool[id] - return used -} - -func (b *blockIDGenerator) Feed(blocks azure.BlockListResponse) { - b.m.Lock() - defer b.m.Unlock() - - for _, bl := range append(blocks.CommittedBlocks, blocks.UncommittedBlocks...) { - b.pool[bl.Name] = true - } -} - -func newBlockIDGenerator() *blockIDGenerator { - return &blockIDGenerator{ - pool: make(map[string]bool), - r: rand.New(rand.NewSource(time.Now().UnixNano()))} -} - -// toBlockId converts given integer to base64-encoded block ID of a fixed length. -func toBlockID(i int) string { - s := fmt.Sprintf("%029d", i) // add zero padding for same length-blobs - return base64.StdEncoding.EncodeToString([]byte(s)) -} diff --git a/docs/storage/driver/azure/blockid_test.go b/docs/storage/driver/azure/blockid_test.go deleted file mode 100644 index aab70202a..000000000 --- a/docs/storage/driver/azure/blockid_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package azure - -import ( - "math" - "testing" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -func Test_blockIdGenerator(t *testing.T) { - r := newBlockIDGenerator() - - for i := 1; i <= 10; i++ { - if expected := i - 1; len(r.pool) != expected { - t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) - } - if id := r.Generate(); id == "" { - t.Fatal("returned empty id") - } - if expected := i; len(r.pool) != expected { - t.Fatalf("rand pool has wrong number of items: %d, expected:%d", len(r.pool), expected) - } - } -} - -func Test_blockIdGenerator_Feed(t *testing.T) { - r := newBlockIDGenerator() - if expected := 0; len(r.pool) != expected { - t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) - } - - // feed empty list - blocks := azure.BlockListResponse{} - r.Feed(blocks) - if expected := 0; len(r.pool) != expected { - t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) - } - - // feed blocks - blocks = azure.BlockListResponse{ - CommittedBlocks: []azure.BlockResponse{ - {"1", 1}, - {"2", 2}, - }, - UncommittedBlocks: []azure.BlockResponse{ - {"3", 3}, - }} - r.Feed(blocks) - if expected := 3; len(r.pool) != expected { - t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) - } - - // feed same block IDs with committed/uncommitted place changed - blocks = azure.BlockListResponse{ - CommittedBlocks: []azure.BlockResponse{ - {"3", 3}, - }, - UncommittedBlocks: []azure.BlockResponse{ - {"1", 1}, - }} - r.Feed(blocks) - if expected := 3; len(r.pool) != expected { - t.Fatalf("rand pool had wrong number of items: %d, expected:%d", len(r.pool), expected) - } -} - -func Test_toBlockId(t *testing.T) { - min := 0 - max := math.MaxInt64 - - if len(toBlockID(min)) != len(toBlockID(max)) { - t.Fatalf("different-sized blockIDs are returned") - } -} diff --git a/docs/storage/driver/azure/randomwriter.go b/docs/storage/driver/azure/randomwriter.go deleted file mode 100644 index f18692d0b..000000000 --- a/docs/storage/driver/azure/randomwriter.go +++ /dev/null @@ -1,208 +0,0 @@ -package azure - -import ( - "fmt" - "io" - "io/ioutil" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -// blockStorage is the interface required from a block storage service -// client implementation -type blockStorage interface { - CreateBlockBlob(container, blob string) error - GetBlob(container, blob string) (io.ReadCloser, error) - GetSectionReader(container, blob string, start, length int64) (io.ReadCloser, error) - PutBlock(container, blob, blockID string, chunk []byte) error - GetBlockList(container, blob string, blockType azure.BlockListType) (azure.BlockListResponse, error) - PutBlockList(container, blob string, blocks []azure.Block) error -} - -// randomBlobWriter enables random access semantics on Azure block blobs -// by enabling writing arbitrary length of chunks to arbitrary write offsets -// within the blob. Normally, Azure Blob Storage does not support random -// access semantics on block blobs; however, this writer can download, split and -// reupload the overlapping blocks and discards those being overwritten entirely. -type randomBlobWriter struct { - bs blockStorage - blockSize int -} - -func newRandomBlobWriter(bs blockStorage, blockSize int) randomBlobWriter { - return randomBlobWriter{bs: bs, blockSize: blockSize} -} - -// WriteBlobAt writes the given chunk to the specified position of an existing blob. -// The offset must be equals to size of the blob or smaller than it. -func (r *randomBlobWriter) WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) { - rand := newBlockIDGenerator() - - blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted) - if err != nil { - return 0, err - } - rand.Feed(blocks) // load existing block IDs - - // Check for write offset for existing blob - size := getBlobSize(blocks) - if offset < 0 || offset > size { - return 0, fmt.Errorf("wrong offset for Write: %v", offset) - } - - // Upload the new chunk as blocks - blockList, nn, err := r.writeChunkToBlocks(container, blob, chunk, rand) - if err != nil { - return 0, err - } - - // For non-append operations, existing blocks may need to be splitted - if offset != size { - // Split the block on the left end (if any) - leftBlocks, err := r.blocksLeftSide(container, blob, offset, rand) - if err != nil { - return 0, err - } - blockList = append(leftBlocks, blockList...) - - // Split the block on the right end (if any) - rightBlocks, err := r.blocksRightSide(container, blob, offset, nn, rand) - if err != nil { - return 0, err - } - blockList = append(blockList, rightBlocks...) - } else { - // Use existing block list - var existingBlocks []azure.Block - for _, v := range blocks.CommittedBlocks { - existingBlocks = append(existingBlocks, azure.Block{ID: v.Name, Status: azure.BlockStatusCommitted}) - } - blockList = append(existingBlocks, blockList...) - } - // Put block list - return nn, r.bs.PutBlockList(container, blob, blockList) -} - -func (r *randomBlobWriter) GetSize(container, blob string) (int64, error) { - blocks, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeCommitted) - if err != nil { - return 0, err - } - return getBlobSize(blocks), nil -} - -// writeChunkToBlocks writes given chunk to one or multiple blocks within specified -// blob and returns their block representations. Those blocks are not committed, yet -func (r *randomBlobWriter) writeChunkToBlocks(container, blob string, chunk io.Reader, rand *blockIDGenerator) ([]azure.Block, int64, error) { - var newBlocks []azure.Block - var nn int64 - - // Read chunks of at most size N except the last chunk to - // maximize block size and minimize block count. - buf := make([]byte, r.blockSize) - for { - n, err := io.ReadFull(chunk, buf) - if err == io.EOF { - break - } - nn += int64(n) - data := buf[:n] - blockID := rand.Generate() - if err := r.bs.PutBlock(container, blob, blockID, data); err != nil { - return newBlocks, nn, err - } - newBlocks = append(newBlocks, azure.Block{ID: blockID, Status: azure.BlockStatusUncommitted}) - } - return newBlocks, nn, nil -} - -// blocksLeftSide returns the blocks that are going to be at the left side of -// the writeOffset: [0, writeOffset) by identifying blocks that will remain -// the same and splitting blocks and reuploading them as needed. -func (r *randomBlobWriter) blocksLeftSide(container, blob string, writeOffset int64, rand *blockIDGenerator) ([]azure.Block, error) { - var left []azure.Block - bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll) - if err != nil { - return left, err - } - - o := writeOffset - elapsed := int64(0) - for _, v := range bx.CommittedBlocks { - blkSize := int64(v.Size) - if o >= blkSize { // use existing block - left = append(left, azure.Block{ID: v.Name, Status: azure.BlockStatusCommitted}) - o -= blkSize - elapsed += blkSize - } else if o > 0 { // current block needs to be splitted - start := elapsed - size := o - part, err := r.bs.GetSectionReader(container, blob, start, size) - if err != nil { - return left, err - } - newBlockID := rand.Generate() - - data, err := ioutil.ReadAll(part) - if err != nil { - return left, err - } - if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil { - return left, err - } - left = append(left, azure.Block{ID: newBlockID, Status: azure.BlockStatusUncommitted}) - break - } - } - return left, nil -} - -// blocksRightSide returns the blocks that are going to be at the right side of -// the written chunk: [writeOffset+size, +inf) by identifying blocks that will remain -// the same and splitting blocks and reuploading them as needed. -func (r *randomBlobWriter) blocksRightSide(container, blob string, writeOffset int64, chunkSize int64, rand *blockIDGenerator) ([]azure.Block, error) { - var right []azure.Block - - bx, err := r.bs.GetBlockList(container, blob, azure.BlockListTypeAll) - if err != nil { - return nil, err - } - - re := writeOffset + chunkSize - 1 // right end of written chunk - var elapsed int64 - for _, v := range bx.CommittedBlocks { - var ( - bs = elapsed // left end of current block - be = elapsed + int64(v.Size) - 1 // right end of current block - ) - - if bs > re { // take the block as is - right = append(right, azure.Block{ID: v.Name, Status: azure.BlockStatusCommitted}) - } else if be > re { // current block needs to be splitted - part, err := r.bs.GetSectionReader(container, blob, re+1, be-(re+1)+1) - if err != nil { - return right, err - } - newBlockID := rand.Generate() - - data, err := ioutil.ReadAll(part) - if err != nil { - return right, err - } - if err = r.bs.PutBlock(container, blob, newBlockID, data); err != nil { - return right, err - } - right = append(right, azure.Block{ID: newBlockID, Status: azure.BlockStatusUncommitted}) - } - elapsed += int64(v.Size) - } - return right, nil -} - -func getBlobSize(blocks azure.BlockListResponse) int64 { - var n int64 - for _, v := range blocks.CommittedBlocks { - n += int64(v.Size) - } - return n -} diff --git a/docs/storage/driver/azure/randomwriter_test.go b/docs/storage/driver/azure/randomwriter_test.go deleted file mode 100644 index 32c2509e4..000000000 --- a/docs/storage/driver/azure/randomwriter_test.go +++ /dev/null @@ -1,339 +0,0 @@ -package azure - -import ( - "bytes" - "io" - "io/ioutil" - "math/rand" - "reflect" - "strings" - "testing" - - azure "github.com/Azure/azure-sdk-for-go/storage" -) - -func TestRandomWriter_writeChunkToBlocks(t *testing.T) { - s := NewStorageSimulator() - rw := newRandomBlobWriter(&s, 3) - rand := newBlockIDGenerator() - c := []byte("AAABBBCCCD") - - if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - bw, nn, err := rw.writeChunkToBlocks("a", "b", bytes.NewReader(c), rand) - if err != nil { - t.Fatal(err) - } - if expected := int64(len(c)); nn != expected { - t.Fatalf("wrong nn:%v, expected:%v", nn, expected) - } - if expected := 4; len(bw) != expected { - t.Fatal("unexpected written block count") - } - - bx, err := s.GetBlockList("a", "b", azure.BlockListTypeAll) - if err != nil { - t.Fatal(err) - } - if expected := 0; len(bx.CommittedBlocks) != expected { - t.Fatal("unexpected committed block count") - } - if expected := 4; len(bx.UncommittedBlocks) != expected { - t.Fatalf("unexpected uncommitted block count: %d -- %#v", len(bx.UncommittedBlocks), bx) - } - - if err := rw.bs.PutBlockList("a", "b", bw); err != nil { - t.Fatal(err) - } - - r, err := rw.bs.GetBlob("a", "b") - if err != nil { - t.Fatal(err) - } - assertBlobContents(t, r, c) -} - -func TestRandomWriter_blocksLeftSide(t *testing.T) { - blob := "AAAAABBBBBCCC" - cases := []struct { - offset int64 - expectedBlob string - expectedPattern []azure.BlockStatus - }{ - {0, "", []azure.BlockStatus{}}, // write to beginning, discard all - {13, blob, []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to end, no change - {1, "A", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // write at 1 - {5, "AAAAA", []azure.BlockStatus{azure.BlockStatusCommitted}}, // write just after first block - {6, "AAAAAB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // split the second block - {9, "AAAAABBBB", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusUncommitted}}, // write just after first block - } - - for _, c := range cases { - s := NewStorageSimulator() - rw := newRandomBlobWriter(&s, 5) - rand := newBlockIDGenerator() - - if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand) - if err != nil { - t.Fatal(err) - } - if err := rw.bs.PutBlockList("a", "b", bw); err != nil { - t.Fatal(err) - } - bx, err := rw.blocksLeftSide("a", "b", c.offset, rand) - if err != nil { - t.Fatal(err) - } - - bs := []azure.BlockStatus{} - for _, v := range bx { - bs = append(bs, v.Status) - } - - if !reflect.DeepEqual(bs, c.expectedPattern) { - t.Logf("Committed blocks %v", bw) - t.Fatalf("For offset %v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.expectedPattern, bs, bx) - } - if rw.bs.PutBlockList("a", "b", bx); err != nil { - t.Fatal(err) - } - r, err := rw.bs.GetBlob("a", "b") - if err != nil { - t.Fatal(err) - } - cout, err := ioutil.ReadAll(r) - if err != nil { - t.Fatal(err) - } - outBlob := string(cout) - if outBlob != c.expectedBlob { - t.Fatalf("wrong blob contents: %v, expected: %v", outBlob, c.expectedBlob) - } - } -} - -func TestRandomWriter_blocksRightSide(t *testing.T) { - blob := "AAAAABBBBBCCC" - cases := []struct { - offset int64 - size int64 - expectedBlob string - expectedPattern []azure.BlockStatus - }{ - {0, 100, "", []azure.BlockStatus{}}, // overwrite the entire blob - {0, 3, "AABBBBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // split first block - {4, 1, "BBBBBCCC", []azure.BlockStatus{azure.BlockStatusCommitted, azure.BlockStatusCommitted}}, // write to last char of first block - {1, 6, "BBBCCC", []azure.BlockStatus{azure.BlockStatusUncommitted, azure.BlockStatusCommitted}}, // overwrite splits first and second block, last block remains - {3, 8, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite a block in middle block, split end block - {10, 1, "CC", []azure.BlockStatus{azure.BlockStatusUncommitted}}, // overwrite first byte of rightmost block - {11, 2, "", []azure.BlockStatus{}}, // overwrite the rightmost index - {13, 20, "", []azure.BlockStatus{}}, // append to the end - } - - for _, c := range cases { - s := NewStorageSimulator() - rw := newRandomBlobWriter(&s, 5) - rand := newBlockIDGenerator() - - if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - bw, _, err := rw.writeChunkToBlocks("a", "b", strings.NewReader(blob), rand) - if err != nil { - t.Fatal(err) - } - if err := rw.bs.PutBlockList("a", "b", bw); err != nil { - t.Fatal(err) - } - bx, err := rw.blocksRightSide("a", "b", c.offset, c.size, rand) - if err != nil { - t.Fatal(err) - } - - bs := []azure.BlockStatus{} - for _, v := range bx { - bs = append(bs, v.Status) - } - - if !reflect.DeepEqual(bs, c.expectedPattern) { - t.Logf("Committed blocks %v", bw) - t.Fatalf("For offset %v-size:%v: Expected pattern: %v, Got: %v\n(Returned: %v)", c.offset, c.size, c.expectedPattern, bs, bx) - } - if rw.bs.PutBlockList("a", "b", bx); err != nil { - t.Fatal(err) - } - r, err := rw.bs.GetBlob("a", "b") - if err != nil { - t.Fatal(err) - } - cout, err := ioutil.ReadAll(r) - if err != nil { - t.Fatal(err) - } - outBlob := string(cout) - if outBlob != c.expectedBlob { - t.Fatalf("For offset %v-size:%v: wrong blob contents: %v, expected: %v", c.offset, c.size, outBlob, c.expectedBlob) - } - } -} - -func TestRandomWriter_Write_NewBlob(t *testing.T) { - var ( - s = NewStorageSimulator() - rw = newRandomBlobWriter(&s, 1024*3) // 3 KB blocks - blob = randomContents(1024 * 7) // 7 KB blob - ) - if err := rw.bs.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - - if _, err := rw.WriteBlobAt("a", "b", 10, bytes.NewReader(blob)); err == nil { - t.Fatal("expected error, got nil") - } - if _, err := rw.WriteBlobAt("a", "b", 100000, bytes.NewReader(blob)); err == nil { - t.Fatal("expected error, got nil") - } - if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(blob)); err != nil { - t.Fatal(err) - } else if expected := int64(len(blob)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := rw.bs.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, blob) - } - if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { - t.Fatal(err) - } else if len(bx.CommittedBlocks) != 3 { - t.Fatalf("got wrong number of committed blocks: %v", len(bx.CommittedBlocks)) - } - - // Replace first 512 bytes - leftChunk := randomContents(512) - blob = append(leftChunk, blob[512:]...) - if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(leftChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(leftChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := rw.bs.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, blob) - } - if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { - t.Fatal(err) - } else if expected := 4; len(bx.CommittedBlocks) != expected { - t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected) - } - - // Replace last 512 bytes with 1024 bytes - rightChunk := randomContents(1024) - offset := int64(len(blob) - 512) - blob = append(blob[:offset], rightChunk...) - if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(rightChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(rightChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := rw.bs.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, blob) - } - if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { - t.Fatal(err) - } else if expected := 5; len(bx.CommittedBlocks) != expected { - t.Fatalf("got wrong number of committed blocks: %v, expected: %v", len(bx.CommittedBlocks), expected) - } - - // Replace 2K-4K (overlaps 2 blocks from L/R) - newChunk := randomContents(1024 * 2) - offset = 1024 * 2 - blob = append(append(blob[:offset], newChunk...), blob[offset+int64(len(newChunk)):]...) - if nn, err := rw.WriteBlobAt("a", "b", offset, bytes.NewReader(newChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(newChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := rw.bs.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, blob) - } - if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { - t.Fatal(err) - } else if expected := 6; len(bx.CommittedBlocks) != expected { - t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks) - } - - // Replace the entire blob - newBlob := randomContents(1024 * 30) - if nn, err := rw.WriteBlobAt("a", "b", 0, bytes.NewReader(newBlob)); err != nil { - t.Fatal(err) - } else if expected := int64(len(newBlob)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := rw.bs.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, newBlob) - } - if bx, err := rw.bs.GetBlockList("a", "b", azure.BlockListTypeCommitted); err != nil { - t.Fatal(err) - } else if expected := 10; len(bx.CommittedBlocks) != expected { - t.Fatalf("got wrong number of committed blocks: %v, expected: %v\n%v", len(bx.CommittedBlocks), expected, bx.CommittedBlocks) - } else if expected, size := int64(1024*30), getBlobSize(bx); size != expected { - t.Fatalf("committed block size does not indicate blob size") - } -} - -func Test_getBlobSize(t *testing.T) { - // with some committed blocks - if expected, size := int64(151), getBlobSize(azure.BlockListResponse{ - CommittedBlocks: []azure.BlockResponse{ - {"A", 100}, - {"B", 50}, - {"C", 1}, - }, - UncommittedBlocks: []azure.BlockResponse{ - {"D", 200}, - }}); expected != size { - t.Fatalf("wrong blob size: %v, expected: %v", size, expected) - } - - // with no committed blocks - if expected, size := int64(0), getBlobSize(azure.BlockListResponse{ - UncommittedBlocks: []azure.BlockResponse{ - {"A", 100}, - {"B", 50}, - {"C", 1}, - {"D", 200}, - }}); expected != size { - t.Fatalf("wrong blob size: %v, expected: %v", size, expected) - } -} - -func assertBlobContents(t *testing.T, r io.Reader, expected []byte) { - out, err := ioutil.ReadAll(r) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(out, expected) { - t.Fatalf("wrong blob contents. size: %v, expected: %v", len(out), len(expected)) - } -} - -func randomContents(length int64) []byte { - b := make([]byte, length) - for i := range b { - b[i] = byte(rand.Intn(2 << 8)) - } - return b -} diff --git a/docs/storage/driver/azure/zerofillwriter.go b/docs/storage/driver/azure/zerofillwriter.go deleted file mode 100644 index 095489d22..000000000 --- a/docs/storage/driver/azure/zerofillwriter.go +++ /dev/null @@ -1,49 +0,0 @@ -package azure - -import ( - "bytes" - "io" -) - -type blockBlobWriter interface { - GetSize(container, blob string) (int64, error) - WriteBlobAt(container, blob string, offset int64, chunk io.Reader) (int64, error) -} - -// zeroFillWriter enables writing to an offset outside a block blob's size -// by offering the chunk to the underlying writer as a contiguous data with -// the gap in between filled with NUL (zero) bytes. -type zeroFillWriter struct { - blockBlobWriter -} - -func newZeroFillWriter(b blockBlobWriter) zeroFillWriter { - w := zeroFillWriter{} - w.blockBlobWriter = b - return w -} - -// Write writes the given chunk to the specified existing blob even though -// offset is out of blob's size. The gaps are filled with zeros. Returned -// written number count does not include zeros written. -func (z *zeroFillWriter) Write(container, blob string, offset int64, chunk io.Reader) (int64, error) { - size, err := z.blockBlobWriter.GetSize(container, blob) - if err != nil { - return 0, err - } - - var reader io.Reader - var zeroPadding int64 - if offset <= size { - reader = chunk - } else { - zeroPadding = offset - size - offset = size // adjust offset to be the append index - zeros := bytes.NewReader(make([]byte, zeroPadding)) - reader = io.MultiReader(zeros, chunk) - } - - nn, err := z.blockBlobWriter.WriteBlobAt(container, blob, offset, reader) - nn -= zeroPadding - return nn, err -} diff --git a/docs/storage/driver/azure/zerofillwriter_test.go b/docs/storage/driver/azure/zerofillwriter_test.go deleted file mode 100644 index 49361791a..000000000 --- a/docs/storage/driver/azure/zerofillwriter_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package azure - -import ( - "bytes" - "testing" -) - -func Test_zeroFillWrite_AppendNoGap(t *testing.T) { - s := NewStorageSimulator() - bw := newRandomBlobWriter(&s, 1024*1) - zw := newZeroFillWriter(&bw) - if err := s.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - - firstChunk := randomContents(1024*3 + 512) - if nn, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(firstChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, firstChunk) - } - - secondChunk := randomContents(256) - if nn, err := zw.Write("a", "b", int64(len(firstChunk)), bytes.NewReader(secondChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(secondChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, append(firstChunk, secondChunk...)) - } - -} - -func Test_zeroFillWrite_StartWithGap(t *testing.T) { - s := NewStorageSimulator() - bw := newRandomBlobWriter(&s, 1024*2) - zw := newZeroFillWriter(&bw) - if err := s.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - - chunk := randomContents(1024 * 5) - padding := int64(1024*2 + 256) - if nn, err := zw.Write("a", "b", padding, bytes.NewReader(chunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(chunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, append(make([]byte, padding), chunk...)) - } -} - -func Test_zeroFillWrite_AppendWithGap(t *testing.T) { - s := NewStorageSimulator() - bw := newRandomBlobWriter(&s, 1024*2) - zw := newZeroFillWriter(&bw) - if err := s.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - - firstChunk := randomContents(1024*3 + 512) - if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { - t.Fatal(err) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, firstChunk) - } - - secondChunk := randomContents(256) - padding := int64(1024 * 4) - if nn, err := zw.Write("a", "b", int64(len(firstChunk))+padding, bytes.NewReader(secondChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(secondChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, append(firstChunk, append(make([]byte, padding), secondChunk...)...)) - } -} - -func Test_zeroFillWrite_LiesWithinSize(t *testing.T) { - s := NewStorageSimulator() - bw := newRandomBlobWriter(&s, 1024*2) - zw := newZeroFillWriter(&bw) - if err := s.CreateBlockBlob("a", "b"); err != nil { - t.Fatal(err) - } - - firstChunk := randomContents(1024 * 3) - if _, err := zw.Write("a", "b", 0, bytes.NewReader(firstChunk)); err != nil { - t.Fatal(err) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, firstChunk) - } - - // in this case, zerofill won't be used - secondChunk := randomContents(256) - if nn, err := zw.Write("a", "b", 0, bytes.NewReader(secondChunk)); err != nil { - t.Fatal(err) - } else if expected := int64(len(secondChunk)); expected != nn { - t.Fatalf("wrong written bytes count: %v, expected: %v", nn, expected) - } - if out, err := s.GetBlob("a", "b"); err != nil { - t.Fatal(err) - } else { - assertBlobContents(t, out, append(secondChunk, firstChunk[len(secondChunk):]...)) - } -} diff --git a/docs/storage/driver/base/base.go b/docs/storage/driver/base/base.go index c816d2d6f..064bda60f 100644 --- a/docs/storage/driver/base/base.go +++ b/docs/storage/driver/base/base.go @@ -102,10 +102,10 @@ func (base *Base) PutContent(ctx context.Context, path string, content []byte) e return base.setDriverName(base.StorageDriver.PutContent(ctx, path, content)) } -// ReadStream wraps ReadStream of underlying storage driver. -func (base *Base) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +// Reader wraps Reader of underlying storage driver. +func (base *Base) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { ctx, done := context.WithTrace(ctx) - defer done("%s.ReadStream(%q, %d)", base.Name(), path, offset) + defer done("%s.Reader(%q, %d)", base.Name(), path, offset) if offset < 0 { return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: base.StorageDriver.Name()} @@ -115,25 +115,21 @@ func (base *Base) ReadStream(ctx context.Context, path string, offset int64) (io return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} } - rc, e := base.StorageDriver.ReadStream(ctx, path, offset) + rc, e := base.StorageDriver.Reader(ctx, path, offset) return rc, base.setDriverName(e) } -// WriteStream wraps WriteStream of underlying storage driver. -func (base *Base) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { +// Writer wraps Writer of underlying storage driver. +func (base *Base) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { ctx, done := context.WithTrace(ctx) - defer done("%s.WriteStream(%q, %d)", base.Name(), path, offset) - - if offset < 0 { - return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset, DriverName: base.StorageDriver.Name()} - } + defer done("%s.Writer(%q, %v)", base.Name(), path, append) if !storagedriver.PathRegexp.MatchString(path) { - return 0, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} + return nil, storagedriver.InvalidPathError{Path: path, DriverName: base.StorageDriver.Name()} } - i64, e := base.StorageDriver.WriteStream(ctx, path, offset, reader) - return i64, base.setDriverName(e) + writer, e := base.StorageDriver.Writer(ctx, path, append) + return writer, base.setDriverName(e) } // Stat wraps Stat of underlying storage driver. diff --git a/docs/storage/driver/filesystem/driver.go b/docs/storage/driver/filesystem/driver.go index 5b495818b..3bbdc6379 100644 --- a/docs/storage/driver/filesystem/driver.go +++ b/docs/storage/driver/filesystem/driver.go @@ -1,6 +1,7 @@ package filesystem import ( + "bufio" "bytes" "fmt" "io" @@ -78,7 +79,7 @@ func (d *driver) Name() string { // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - rc, err := d.ReadStream(ctx, path, 0) + rc, err := d.Reader(ctx, path, 0) if err != nil { return nil, err } @@ -94,16 +95,22 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { // PutContent stores the []byte content at a location designated by "path". func (d *driver) PutContent(ctx context.Context, subPath string, contents []byte) error { - if _, err := d.WriteStream(ctx, subPath, 0, bytes.NewReader(contents)); err != nil { + writer, err := d.Writer(ctx, subPath, false) + if err != nil { return err } - - return os.Truncate(d.fullPath(subPath), int64(len(contents))) + defer writer.Close() + _, err = io.Copy(writer, bytes.NewReader(contents)) + if err != nil { + writer.Cancel() + return err + } + return writer.Commit() } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { file, err := os.OpenFile(d.fullPath(path), os.O_RDONLY, 0644) if err != nil { if os.IsNotExist(err) { @@ -125,40 +132,36 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return file, nil } -// WriteStream stores the contents of the provided io.Reader at a location -// designated by the given path. -func (d *driver) WriteStream(ctx context.Context, subPath string, offset int64, reader io.Reader) (nn int64, err error) { - // TODO(stevvooe): This needs to be a requirement. - // if !path.IsAbs(subPath) { - // return fmt.Errorf("absolute path required: %q", subPath) - // } - +func (d *driver) Writer(ctx context.Context, subPath string, append bool) (storagedriver.FileWriter, error) { fullPath := d.fullPath(subPath) parentDir := path.Dir(fullPath) if err := os.MkdirAll(parentDir, 0777); err != nil { - return 0, err + return nil, err } fp, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - // TODO(stevvooe): A few missing conditions in storage driver: - // 1. What if the path is already a directory? - // 2. Should number 1 be exposed explicitly in storagedriver? - // 2. Can this path not exist, even if we create above? - return 0, err - } - defer fp.Close() - - nn, err = fp.Seek(offset, os.SEEK_SET) - if err != nil { - return 0, err + return nil, err } - if nn != offset { - return 0, fmt.Errorf("bad seek to %v, expected %v in fp=%v", offset, nn, fp) + var offset int64 + + if !append { + err := fp.Truncate(0) + if err != nil { + fp.Close() + return nil, err + } + } else { + n, err := fp.Seek(0, os.SEEK_END) + if err != nil { + fp.Close() + return nil, err + } + offset = int64(n) } - return io.Copy(fp, reader) + return newFileWriter(fp, offset), nil } // Stat retrieves the FileInfo for the given path, including the current size @@ -286,3 +289,88 @@ func (fi fileInfo) ModTime() time.Time { func (fi fileInfo) IsDir() bool { return fi.FileInfo.IsDir() } + +type fileWriter struct { + file *os.File + size int64 + bw *bufio.Writer + closed bool + committed bool + cancelled bool +} + +func newFileWriter(file *os.File, size int64) *fileWriter { + return &fileWriter{ + file: file, + size: size, + bw: bufio.NewWriter(file), + } +} + +func (fw *fileWriter) Write(p []byte) (int, error) { + if fw.closed { + return 0, fmt.Errorf("already closed") + } else if fw.committed { + return 0, fmt.Errorf("already committed") + } else if fw.cancelled { + return 0, fmt.Errorf("already cancelled") + } + n, err := fw.bw.Write(p) + fw.size += int64(n) + return n, err +} + +func (fw *fileWriter) Size() int64 { + return fw.size +} + +func (fw *fileWriter) Close() error { + if fw.closed { + return fmt.Errorf("already closed") + } + + if err := fw.bw.Flush(); err != nil { + return err + } + + if err := fw.file.Sync(); err != nil { + return err + } + + if err := fw.file.Close(); err != nil { + return err + } + fw.closed = true + return nil +} + +func (fw *fileWriter) Cancel() error { + if fw.closed { + return fmt.Errorf("already closed") + } + + fw.cancelled = true + fw.file.Close() + return os.Remove(fw.file.Name()) +} + +func (fw *fileWriter) Commit() error { + if fw.closed { + return fmt.Errorf("already closed") + } else if fw.committed { + return fmt.Errorf("already committed") + } else if fw.cancelled { + return fmt.Errorf("already cancelled") + } + + if err := fw.bw.Flush(); err != nil { + return err + } + + if err := fw.file.Sync(); err != nil { + return err + } + + fw.committed = true + return nil +} diff --git a/docs/storage/driver/inmemory/driver.go b/docs/storage/driver/inmemory/driver.go index b5735c0ac..eb2fd1cf4 100644 --- a/docs/storage/driver/inmemory/driver.go +++ b/docs/storage/driver/inmemory/driver.go @@ -1,7 +1,6 @@ package inmemory import ( - "bytes" "fmt" "io" "io/ioutil" @@ -74,7 +73,7 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { d.mutex.RLock() defer d.mutex.RUnlock() - rc, err := d.ReadStream(ctx, path, 0) + rc, err := d.Reader(ctx, path, 0) if err != nil { return nil, err } @@ -88,7 +87,9 @@ func (d *driver) PutContent(ctx context.Context, p string, contents []byte) erro d.mutex.Lock() defer d.mutex.Unlock() - f, err := d.root.mkfile(p) + normalized := normalize(p) + + f, err := d.root.mkfile(normalized) if err != nil { // TODO(stevvooe): Again, we need to clarify when this is not a // directory in StorageDriver API. @@ -101,9 +102,9 @@ func (d *driver) PutContent(ctx context.Context, p string, contents []byte) erro return nil } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { d.mutex.RLock() defer d.mutex.RUnlock() @@ -111,10 +112,10 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return nil, storagedriver.InvalidOffsetError{Path: path, Offset: offset} } - path = normalize(path) - found := d.root.find(path) + normalized := normalize(path) + found := d.root.find(normalized) - if found.path() != path { + if found.path() != normalized { return nil, storagedriver.PathNotFoundError{Path: path} } @@ -125,46 +126,24 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return ioutil.NopCloser(found.(*file).sectionReader(offset)), nil } -// WriteStream stores the contents of the provided io.ReadCloser at a location -// designated by the given path. -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) { +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { d.mutex.Lock() defer d.mutex.Unlock() - if offset < 0 { - return 0, storagedriver.InvalidOffsetError{Path: path, Offset: offset} - } - normalized := normalize(path) f, err := d.root.mkfile(normalized) if err != nil { - return 0, fmt.Errorf("not a file") + return nil, fmt.Errorf("not a file") } - // Unlock while we are reading from the source, in case we are reading - // from the same mfs instance. This can be fixed by a more granular - // locking model. - d.mutex.Unlock() - d.mutex.RLock() // Take the readlock to block other writers. - var buf bytes.Buffer - - nn, err = buf.ReadFrom(reader) - if err != nil { - // TODO(stevvooe): This condition is odd and we may need to clarify: - // we've read nn bytes from reader but have written nothing to the - // backend. What is the correct return value? Really, the caller needs - // to know that the reader has been advanced and reattempting the - // operation is incorrect. - d.mutex.RUnlock() - d.mutex.Lock() - return nn, err + if !append { + f.truncate() } - d.mutex.RUnlock() - d.mutex.Lock() - f.WriteAt(buf.Bytes(), offset) - return nn, err + return d.newWriter(f), nil } // Stat returns info about the provided path. @@ -173,7 +152,7 @@ func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, defer d.mutex.RUnlock() normalized := normalize(path) - found := d.root.find(path) + found := d.root.find(normalized) if found.path() != normalized { return nil, storagedriver.PathNotFoundError{Path: path} @@ -260,3 +239,74 @@ func (d *driver) Delete(ctx context.Context, path string) error { func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) { return "", storagedriver.ErrUnsupportedMethod{} } + +type writer struct { + d *driver + f *file + closed bool + committed bool + cancelled bool +} + +func (d *driver) newWriter(f *file) storagedriver.FileWriter { + return &writer{ + d: d, + f: f, + } +} + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + w.d.mutex.Lock() + defer w.d.mutex.Unlock() + + return w.f.WriteAt(p, int64(len(w.f.data))) +} + +func (w *writer) Size() int64 { + w.d.mutex.RLock() + defer w.d.mutex.RUnlock() + + return int64(len(w.f.data)) +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return nil +} + +func (w *writer) Cancel() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + + w.d.mutex.Lock() + defer w.d.mutex.Unlock() + + return w.d.root.delete(w.f.path()) +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + w.committed = true + return nil +} diff --git a/docs/storage/driver/s3-aws/s3.go b/docs/storage/driver/s3-aws/s3.go index 0e113680f..eb617d73c 100644 --- a/docs/storage/driver/s3-aws/s3.go +++ b/docs/storage/driver/s3-aws/s3.go @@ -20,10 +20,8 @@ import ( "reflect" "strconv" "strings" - "sync" "time" - "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" @@ -103,9 +101,6 @@ type driver struct { Encrypt bool RootDirectory string StorageClass string - - pool sync.Pool // pool []byte buffers used for WriteStream - zeros []byte // shared, zero-valued buffer used for WriteStream } type baseEmbed struct { @@ -302,11 +297,6 @@ func New(params DriverParameters) (*Driver, error) { Encrypt: params.Encrypt, RootDirectory: params.RootDirectory, StorageClass: params.StorageClass, - zeros: make([]byte, params.ChunkSize), - } - - d.pool.New = func() interface{} { - return make([]byte, d.ChunkSize) } return &Driver{ @@ -326,7 +316,7 @@ func (d *driver) Name() string { // GetContent retrieves the content stored at "path" as a []byte. func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) { - reader, err := d.ReadStream(ctx, path, 0) + reader, err := d.Reader(ctx, path, 0) if err != nil { return nil, err } @@ -347,9 +337,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e return parseError(path, err) } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { resp, err := d.S3.GetObject(&s3.GetObjectInput{ Bucket: aws.String(d.Bucket), Key: aws.String(d.s3Path(path)), @@ -366,372 +356,52 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return resp.Body, nil } -// WriteStream stores the contents of the provided io.Reader at a -// location designated by the given path. The driver will know it has -// received the full contents when the reader returns io.EOF. The number -// of successfully READ bytes will be returned, even if an error is -// returned. May be used to resume writing a stream by providing a nonzero -// offset. Offsets past the current size will write from the position -// beyond the end of the file. -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { - var partNumber int64 = 1 - bytesRead := 0 - var putErrChan chan error - parts := []*s3.CompletedPart{} - done := make(chan struct{}) // stopgap to free up waiting goroutines - - resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - ContentType: d.getContentType(), - ACL: d.getACL(), - ServerSideEncryption: d.getEncryptionMode(), - StorageClass: d.getStorageClass(), - }) - if err != nil { - return 0, err - } - - uploadID := resp.UploadId - - buf := d.getbuf() - - // We never want to leave a dangling multipart upload, our only consistent state is - // when there is a whole object at path. This is in order to remain consistent with - // the stat call. - // - // Note that if the machine dies before executing the defer, we will be left with a dangling - // multipart upload, which will eventually be cleaned up, but we will lose all of the progress - // made prior to the machine crashing. - defer func() { - if putErrChan != nil { - if putErr := <-putErrChan; putErr != nil { - err = putErr - } - } - - if len(parts) > 0 { - _, err := d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - UploadId: uploadID, - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: parts, - }, - }) - if err != nil { - // TODO (brianbland): log errors here - d.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - UploadId: uploadID, - }) - } - } - - d.putbuf(buf) // needs to be here to pick up new buf value - close(done) // free up any waiting goroutines - }() - - // Fills from 0 to total from current - fromSmallCurrent := func(total int64) error { - current, err := d.ReadStream(ctx, path, 0) - if err != nil { - return err - } - - bytesRead = 0 - for int64(bytesRead) < total { - //The loop should very rarely enter a second iteration - nn, err := current.Read(buf[bytesRead:total]) - bytesRead += nn - if err != nil { - if err != io.EOF { - return err - } - - break - } - - } - return nil - } - - // Fills from parameter to chunkSize from reader - fromReader := func(from int64) error { - bytesRead = 0 - for from+int64(bytesRead) < d.ChunkSize { - nn, err := reader.Read(buf[from+int64(bytesRead):]) - totalRead += int64(nn) - bytesRead += nn - - if err != nil { - if err != io.EOF { - return err - } - - break - } - } - - if putErrChan == nil { - putErrChan = make(chan error) - } else { - if putErr := <-putErrChan; putErr != nil { - putErrChan = nil - return putErr - } - } - - go func(bytesRead int, from int64, buf []byte) { - defer d.putbuf(buf) // this buffer gets dropped after this call - - // DRAGONS(stevvooe): There are few things one might want to know - // about this section. First, the putErrChan is expecting an error - // and a nil or just a nil to come through the channel. This is - // covered by the silly defer below. The other aspect is the s3 - // retry backoff to deal with RequestTimeout errors. Even though - // the underlying s3 library should handle it, it doesn't seem to - // be part of the shouldRetry function (see AdRoll/goamz/s3). - defer func() { - select { - case putErrChan <- nil: // for some reason, we do this no matter what. - case <-done: - return // ensure we don't leak the goroutine - } - }() - - if bytesRead <= 0 { - return - } - - resp, err := d.S3.UploadPart(&s3.UploadPartInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - PartNumber: aws.Int64(partNumber), - UploadId: uploadID, - Body: bytes.NewReader(buf[0 : int64(bytesRead)+from]), - }) - if err != nil { - logrus.Errorf("error putting part, aborting: %v", err) - select { - case putErrChan <- err: - case <-done: - return // don't leak the goroutine - } - } - - // parts and partNumber are safe, because this function is the - // only one modifying them and we force it to be executed - // serially. - parts = append(parts, &s3.CompletedPart{ - ETag: resp.ETag, - PartNumber: aws.Int64(partNumber), - }) - partNumber++ - }(bytesRead, from, buf) - - buf = d.getbuf() // use a new buffer for the next call - return nil - } - - if offset > 0 { - resp, err := d.S3.HeadObject(&s3.HeadObjectInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + key := d.s3Path(path) + if !append { + // TODO (brianbland): cancel other uploads at this path + resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + ContentType: d.getContentType(), + ACL: d.getACL(), + ServerSideEncryption: d.getEncryptionMode(), + StorageClass: d.getStorageClass(), }) if err != nil { - if s3Err, ok := err.(awserr.Error); !ok || s3Err.Code() != "NoSuchKey" { - return 0, err - } - } - - currentLength := int64(0) - if err == nil && resp.ContentLength != nil { - currentLength = *resp.ContentLength - } - - if currentLength >= offset { - if offset < d.ChunkSize { - // chunkSize > currentLength >= offset - if err = fromSmallCurrent(offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // currentLength >= offset >= chunkSize - resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - PartNumber: aws.Int64(partNumber), - UploadId: uploadID, - CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)), - CopySourceRange: aws.String("bytes=0-" + strconv.FormatInt(offset-1, 10)), - }) - if err != nil { - return 0, err - } - - parts = append(parts, &s3.CompletedPart{ - ETag: resp.CopyPartResult.ETag, - PartNumber: aws.Int64(partNumber), - }) - partNumber++ - } - } else { - // Fills between parameters with 0s but only when to - from <= chunkSize - fromZeroFillSmall := func(from, to int64) error { - bytesRead = 0 - for from+int64(bytesRead) < to { - nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) - bytesRead += nn - if err != nil { - return err - } - } - - return nil - } - - // Fills between parameters with 0s, making new parts - fromZeroFillLarge := func(from, to int64) error { - bytesRead64 := int64(0) - for to-(from+bytesRead64) >= d.ChunkSize { - resp, err := d.S3.UploadPart(&s3.UploadPartInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - PartNumber: aws.Int64(partNumber), - UploadId: uploadID, - Body: bytes.NewReader(d.zeros), - }) - if err != nil { - return err - } - bytesRead64 += d.ChunkSize - - parts = append(parts, &s3.CompletedPart{ - ETag: resp.ETag, - PartNumber: aws.Int64(partNumber), - }) - partNumber++ - } - - return fromZeroFillSmall(0, (to-from)%d.ChunkSize) - } - - // currentLength < offset - if currentLength < d.ChunkSize { - if offset < d.ChunkSize { - // chunkSize > offset > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // offset >= chunkSize > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil { - return totalRead, err - } - - resp, err := d.S3.UploadPart(&s3.UploadPartInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - PartNumber: aws.Int64(partNumber), - UploadId: uploadID, - Body: bytes.NewReader(buf), - }) - if err != nil { - return totalRead, err - } - - parts = append(parts, &s3.CompletedPart{ - ETag: resp.ETag, - PartNumber: aws.Int64(partNumber), - }) - partNumber++ - - //Zero fill from chunkSize up to offset, then some reader - if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+(offset%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - } else { - // offset > currentLength >= chunkSize - resp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ - Bucket: aws.String(d.Bucket), - Key: aws.String(d.s3Path(path)), - PartNumber: aws.Int64(partNumber), - UploadId: uploadID, - CopySource: aws.String(d.Bucket + "/" + d.s3Path(path)), - }) - if err != nil { - return 0, err - } - - parts = append(parts, &s3.CompletedPart{ - ETag: resp.CopyPartResult.ETag, - PartNumber: aws.Int64(partNumber), - }) - partNumber++ - - //Zero fill from currentLength up to offset, then some reader - if err = fromZeroFillLarge(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - + return nil, err } + return d.newWriter(key, *resp.UploadId, nil), nil + } + resp, err := d.S3.ListMultipartUploads(&s3.ListMultipartUploadsInput{ + Bucket: aws.String(d.Bucket), + Prefix: aws.String(key), + }) + if err != nil { + return nil, parseError(path, err) } - for { - if err = fromReader(0); err != nil { - return totalRead, err + for _, multi := range resp.Uploads { + if key != *multi.Key { + continue } - - if int64(bytesRead) < d.ChunkSize { - break + resp, err := d.S3.ListParts(&s3.ListPartsInput{ + Bucket: aws.String(d.Bucket), + Key: aws.String(key), + UploadId: multi.UploadId, + }) + if err != nil { + return nil, parseError(path, err) } + var multiSize int64 + for _, part := range resp.Parts { + multiSize += *part.Size + } + return d.newWriter(key, *multi.UploadId, resp.Parts), nil } - - return totalRead, nil + return nil, storagedriver.PathNotFoundError{Path: path} } // Stat retrieves the FileInfo for the given path, including the current size @@ -971,12 +641,258 @@ func (d *driver) getStorageClass() *string { return aws.String(d.StorageClass) } -// getbuf returns a buffer from the driver's pool with length d.ChunkSize. -func (d *driver) getbuf() []byte { - return d.pool.Get().([]byte) +// writer attempts to upload parts to S3 in a buffered fashion where the last +// part is at least as large as the chunksize, so the multipart upload could be +// cleanly resumed in the future. This is violated if Close is called after less +// than a full chunk is written. +type writer struct { + driver *driver + key string + uploadID string + parts []*s3.Part + size int64 + readyPart []byte + pendingPart []byte + closed bool + committed bool + cancelled bool } -func (d *driver) putbuf(p []byte) { - copy(p, d.zeros) - d.pool.Put(p) +func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter { + var size int64 + for _, part := range parts { + size += *part.Size + } + return &writer{ + driver: d, + key: key, + uploadID: uploadID, + parts: parts, + size: size, + } +} + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + // If the last written part is smaller than minChunkSize, we need to make a + // new multipart upload :sadface: + if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { + var completedParts []*s3.CompletedPart + for _, part := range w.parts { + completedParts = append(completedParts, &s3.CompletedPart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + _, err := w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return 0, err + } + + resp, err := w.driver.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + ContentType: w.driver.getContentType(), + ACL: w.driver.getACL(), + ServerSideEncryption: w.driver.getEncryptionMode(), + StorageClass: w.driver.getStorageClass(), + }) + if err != nil { + return 0, err + } + w.uploadID = *resp.UploadId + + // If the entire written file is smaller than minChunkSize, we need to make + // a new part from scratch :double sad face: + if w.size < minChunkSize { + resp, err := w.driver.S3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + }) + defer resp.Body.Close() + if err != nil { + return 0, err + } + w.parts = nil + w.readyPart, err = ioutil.ReadAll(resp.Body) + if err != nil { + return 0, err + } + } else { + // Otherwise we can use the old file as the new first part + copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{ + Bucket: aws.String(w.driver.Bucket), + CopySource: aws.String(w.driver.Bucket + "/" + w.key), + Key: aws.String(w.key), + PartNumber: aws.Int64(1), + UploadId: resp.UploadId, + }) + if err != nil { + return 0, err + } + w.parts = []*s3.Part{ + { + ETag: copyPartResp.CopyPartResult.ETag, + PartNumber: aws.Int64(1), + Size: aws.Int64(w.size), + }, + } + } + } + + var n int + + for len(p) > 0 { + // If no parts are ready to write, fill up the first part + if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.readyPart = append(w.readyPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + } else { + w.readyPart = append(w.readyPart, p...) + n += len(p) + p = nil + } + } + + if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.pendingPart = append(w.pendingPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + err := w.flushPart() + if err != nil { + w.size += int64(n) + return n, err + } + } else { + w.pendingPart = append(w.pendingPart, p...) + n += len(p) + p = nil + } + } + } + w.size += int64(n) + return n, nil +} + +func (w *writer) Size() int64 { + return w.size +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return w.flushPart() +} + +func (w *writer) Cancel() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + _, err := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return err +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + err := w.flushPart() + if err != nil { + return err + } + w.committed = true + var completedParts []*s3.CompletedPart + for _, part := range w.parts { + completedParts = append(completedParts, &s3.CompletedPart{ + ETag: part.ETag, + PartNumber: part.PartNumber, + }) + } + _, err = w.driver.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: completedParts, + }, + }) + if err != nil { + w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + UploadId: aws.String(w.uploadID), + }) + return err + } + return nil +} + +// flushPart flushes buffers to write a part to S3. +// Only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flushPart() error { + if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { + // nothing to write + return nil + } + if len(w.pendingPart) < int(w.driver.ChunkSize) { + // closing with a small pending part + // combine ready and pending to avoid writing a small part + w.readyPart = append(w.readyPart, w.pendingPart...) + w.pendingPart = nil + } + + partNumber := aws.Int64(int64(len(w.parts) + 1)) + resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ + Bucket: aws.String(w.driver.Bucket), + Key: aws.String(w.key), + PartNumber: partNumber, + UploadId: aws.String(w.uploadID), + Body: bytes.NewReader(w.readyPart), + }) + if err != nil { + return err + } + w.parts = append(w.parts, &s3.Part{ + ETag: resp.ETag, + PartNumber: partNumber, + Size: aws.Int64(int64(len(w.readyPart))), + }) + w.readyPart = w.pendingPart + w.pendingPart = nil + return nil } diff --git a/docs/storage/driver/s3-goamz/s3.go b/docs/storage/driver/s3-goamz/s3.go index 9208965b3..aa2d31b71 100644 --- a/docs/storage/driver/s3-goamz/s3.go +++ b/docs/storage/driver/s3-goamz/s3.go @@ -21,10 +21,8 @@ import ( "reflect" "strconv" "strings" - "sync" "time" - "github.com/Sirupsen/logrus" "github.com/docker/goamz/aws" "github.com/docker/goamz/s3" @@ -79,9 +77,6 @@ type driver struct { Encrypt bool RootDirectory string StorageClass s3.StorageClass - - pool sync.Pool // pool []byte buffers used for WriteStream - zeros []byte // shared, zero-valued buffer used for WriteStream } type baseEmbed struct { @@ -301,11 +296,6 @@ func New(params DriverParameters) (*Driver, error) { Encrypt: params.Encrypt, RootDirectory: params.RootDirectory, StorageClass: params.StorageClass, - zeros: make([]byte, params.ChunkSize), - } - - d.pool.New = func() interface{} { - return make([]byte, d.ChunkSize) } return &Driver{ @@ -337,9 +327,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e return parseError(path, d.Bucket.Put(d.s3Path(path), contents, d.getContentType(), getPermissions(), d.getOptions())) } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") @@ -354,343 +344,37 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return resp.Body, nil } -// WriteStream stores the contents of the provided io.Reader at a -// location designated by the given path. The driver will know it has -// received the full contents when the reader returns io.EOF. The number -// of successfully READ bytes will be returned, even if an error is -// returned. May be used to resume writing a stream by providing a nonzero -// offset. Offsets past the current size will write from the position -// beyond the end of the file. -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { - partNumber := 1 - bytesRead := 0 - var putErrChan chan error - parts := []s3.Part{} - var part s3.Part - done := make(chan struct{}) // stopgap to free up waiting goroutines - - multi, err := d.Bucket.InitMulti(d.s3Path(path), d.getContentType(), getPermissions(), d.getOptions()) +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + key := d.s3Path(path) + if !append { + // TODO (brianbland): cancel other uploads at this path + multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions()) + if err != nil { + return nil, err + } + return d.newWriter(key, multi, nil), nil + } + multis, _, err := d.Bucket.ListMulti(key, "") if err != nil { - return 0, err + return nil, parseError(path, err) } - - buf := d.getbuf() - - // We never want to leave a dangling multipart upload, our only consistent state is - // when there is a whole object at path. This is in order to remain consistent with - // the stat call. - // - // Note that if the machine dies before executing the defer, we will be left with a dangling - // multipart upload, which will eventually be cleaned up, but we will lose all of the progress - // made prior to the machine crashing. - defer func() { - if putErrChan != nil { - if putErr := <-putErrChan; putErr != nil { - err = putErr - } + for _, multi := range multis { + if key != multi.Key { + continue } - - if len(parts) > 0 { - if multi == nil { - // Parts should be empty if the multi is not initialized - panic("Unreachable") - } else { - if multi.Complete(parts) != nil { - multi.Abort() - } - } - } - - d.putbuf(buf) // needs to be here to pick up new buf value - close(done) // free up any waiting goroutines - }() - - // Fills from 0 to total from current - fromSmallCurrent := func(total int64) error { - current, err := d.ReadStream(ctx, path, 0) + parts, err := multi.ListParts() if err != nil { - return err + return nil, parseError(path, err) } - - bytesRead = 0 - for int64(bytesRead) < total { - //The loop should very rarely enter a second iteration - nn, err := current.Read(buf[bytesRead:total]) - bytesRead += nn - if err != nil { - if err != io.EOF { - return err - } - - break - } - + var multiSize int64 + for _, part := range parts { + multiSize += part.Size } - return nil + return d.newWriter(key, multi, parts), nil } - - // Fills from parameter to chunkSize from reader - fromReader := func(from int64) error { - bytesRead = 0 - for from+int64(bytesRead) < d.ChunkSize { - nn, err := reader.Read(buf[from+int64(bytesRead):]) - totalRead += int64(nn) - bytesRead += nn - - if err != nil { - if err != io.EOF { - return err - } - - break - } - } - - if putErrChan == nil { - putErrChan = make(chan error) - } else { - if putErr := <-putErrChan; putErr != nil { - putErrChan = nil - return putErr - } - } - - go func(bytesRead int, from int64, buf []byte) { - defer d.putbuf(buf) // this buffer gets dropped after this call - - // DRAGONS(stevvooe): There are few things one might want to know - // about this section. First, the putErrChan is expecting an error - // and a nil or just a nil to come through the channel. This is - // covered by the silly defer below. The other aspect is the s3 - // retry backoff to deal with RequestTimeout errors. Even though - // the underlying s3 library should handle it, it doesn't seem to - // be part of the shouldRetry function (see AdRoll/goamz/s3). - defer func() { - select { - case putErrChan <- nil: // for some reason, we do this no matter what. - case <-done: - return // ensure we don't leak the goroutine - } - }() - - if bytesRead <= 0 { - return - } - - var err error - var part s3.Part - - loop: - for retries := 0; retries < 5; retries++ { - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from])) - if err == nil { - break // success! - } - - // NOTE(stevvooe): This retry code tries to only retry under - // conditions where the s3 package does not. We may add s3 - // error codes to the below if we see others bubble up in the - // application. Right now, the most troubling is - // RequestTimeout, which seems to only triggered when a tcp - // connection to s3 slows to a crawl. If the RequestTimeout - // ends up getting added to the s3 library and we don't see - // other errors, this retry loop can be removed. - switch err := err.(type) { - case *s3.Error: - switch err.Code { - case "RequestTimeout": - // allow retries on only this error. - default: - break loop - } - } - - backoff := 100 * time.Millisecond * time.Duration(retries+1) - logrus.Errorf("error putting part, retrying after %v: %v", err, backoff.String()) - time.Sleep(backoff) - } - - if err != nil { - logrus.Errorf("error putting part, aborting: %v", err) - select { - case putErrChan <- err: - case <-done: - return // don't leak the goroutine - } - } - - // parts and partNumber are safe, because this function is the - // only one modifying them and we force it to be executed - // serially. - parts = append(parts, part) - partNumber++ - }(bytesRead, from, buf) - - buf = d.getbuf() // use a new buffer for the next call - return nil - } - - if offset > 0 { - resp, err := d.Bucket.Head(d.s3Path(path), nil) - if err != nil { - if s3Err, ok := err.(*s3.Error); !ok || s3Err.Code != "NoSuchKey" { - return 0, err - } - } - - currentLength := int64(0) - if err == nil { - currentLength = resp.ContentLength - } - - if currentLength >= offset { - if offset < d.ChunkSize { - // chunkSize > currentLength >= offset - if err = fromSmallCurrent(offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // currentLength >= offset >= chunkSize - _, part, err = multi.PutPartCopy(partNumber, - s3.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)}, - d.Bucket.Name+"/"+d.s3Path(path)) - if err != nil { - return 0, err - } - - parts = append(parts, part) - partNumber++ - } - } else { - // Fills between parameters with 0s but only when to - from <= chunkSize - fromZeroFillSmall := func(from, to int64) error { - bytesRead = 0 - for from+int64(bytesRead) < to { - nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) - bytesRead += nn - if err != nil { - return err - } - } - - return nil - } - - // Fills between parameters with 0s, making new parts - fromZeroFillLarge := func(from, to int64) error { - bytesRead64 := int64(0) - for to-(from+bytesRead64) >= d.ChunkSize { - part, err := multi.PutPart(int(partNumber), bytes.NewReader(d.zeros)) - if err != nil { - return err - } - bytesRead64 += d.ChunkSize - - parts = append(parts, part) - partNumber++ - } - - return fromZeroFillSmall(0, (to-from)%d.ChunkSize) - } - - // currentLength < offset - if currentLength < d.ChunkSize { - if offset < d.ChunkSize { - // chunkSize > offset > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // offset >= chunkSize > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil { - return totalRead, err - } - - part, err = multi.PutPart(int(partNumber), bytes.NewReader(buf)) - if err != nil { - return totalRead, err - } - - parts = append(parts, part) - partNumber++ - - //Zero fill from chunkSize up to offset, then some reader - if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+(offset%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - } else { - // offset > currentLength >= chunkSize - _, part, err = multi.PutPartCopy(partNumber, - s3.CopyOptions{}, - d.Bucket.Name+"/"+d.s3Path(path)) - if err != nil { - return 0, err - } - - parts = append(parts, part) - partNumber++ - - //Zero fill from currentLength up to offset, then some reader - if err = fromZeroFillLarge(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - - } - } - - for { - if err = fromReader(0); err != nil { - return totalRead, err - } - - if int64(bytesRead) < d.ChunkSize { - break - } - } - - return totalRead, nil + return nil, storagedriver.PathNotFoundError{Path: path} } // Stat retrieves the FileInfo for the given path, including the current size @@ -882,12 +566,181 @@ func (d *driver) getContentType() string { return "application/octet-stream" } -// getbuf returns a buffer from the driver's pool with length d.ChunkSize. -func (d *driver) getbuf() []byte { - return d.pool.Get().([]byte) +// writer attempts to upload parts to S3 in a buffered fashion where the last +// part is at least as large as the chunksize, so the multipart upload could be +// cleanly resumed in the future. This is violated if Close is called after less +// than a full chunk is written. +type writer struct { + driver *driver + key string + multi *s3.Multi + parts []s3.Part + size int64 + readyPart []byte + pendingPart []byte + closed bool + committed bool + cancelled bool } -func (d *driver) putbuf(p []byte) { - copy(p, d.zeros) - d.pool.Put(p) +func (d *driver) newWriter(key string, multi *s3.Multi, parts []s3.Part) storagedriver.FileWriter { + var size int64 + for _, part := range parts { + size += part.Size + } + return &writer{ + driver: d, + key: key, + multi: multi, + parts: parts, + size: size, + } +} + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + // If the last written part is smaller than minChunkSize, we need to make a + // new multipart upload :sadface: + if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize { + err := w.multi.Complete(w.parts) + if err != nil { + w.multi.Abort() + return 0, err + } + + multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions()) + if err != nil { + return 0, err + } + w.multi = multi + + // If the entire written file is smaller than minChunkSize, we need to make + // a new part from scratch :double sad face: + if w.size < minChunkSize { + contents, err := w.driver.Bucket.Get(w.key) + if err != nil { + return 0, err + } + w.parts = nil + w.readyPart = contents + } else { + // Otherwise we can use the old file as the new first part + _, part, err := multi.PutPartCopy(1, s3.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key) + if err != nil { + return 0, err + } + w.parts = []s3.Part{part} + } + } + + var n int + + for len(p) > 0 { + // If no parts are ready to write, fill up the first part + if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.readyPart = append(w.readyPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + } else { + w.readyPart = append(w.readyPart, p...) + n += len(p) + p = nil + } + } + + if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.pendingPart = append(w.pendingPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + err := w.flushPart() + if err != nil { + w.size += int64(n) + return n, err + } + } else { + w.pendingPart = append(w.pendingPart, p...) + n += len(p) + p = nil + } + } + } + w.size += int64(n) + return n, nil +} + +func (w *writer) Size() int64 { + return w.size +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return w.flushPart() +} + +func (w *writer) Cancel() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + err := w.multi.Abort() + return err +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + err := w.flushPart() + if err != nil { + return err + } + w.committed = true + err = w.multi.Complete(w.parts) + if err != nil { + w.multi.Abort() + return err + } + return nil +} + +// flushPart flushes buffers to write a part to S3. +// Only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flushPart() error { + if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { + // nothing to write + return nil + } + if len(w.pendingPart) < int(w.driver.ChunkSize) { + // closing with a small pending part + // combine ready and pending to avoid writing a small part + w.readyPart = append(w.readyPart, w.pendingPart...) + w.pendingPart = nil + } + + part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart)) + if err != nil { + return err + } + w.parts = append(w.parts, part) + w.readyPart = w.pendingPart + w.pendingPart = nil + return nil } diff --git a/docs/storage/driver/storagedriver.go b/docs/storage/driver/storagedriver.go index 603020f13..2ae9a67e7 100644 --- a/docs/storage/driver/storagedriver.go +++ b/docs/storage/driver/storagedriver.go @@ -49,15 +49,14 @@ type StorageDriver interface { // This should primarily be used for small objects. PutContent(ctx context.Context, path string, content []byte) error - // ReadStream retrieves an io.ReadCloser for the content stored at "path" + // Reader retrieves an io.ReadCloser for the content stored at "path" // with a given byte offset. // May be used to resume reading a stream by providing a nonzero offset. - ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) + Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) - // WriteStream stores the contents of the provided io.ReadCloser at a - // location designated by the given path. - // May be used to resume writing a stream by providing a nonzero offset. - WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (nn int64, err error) + // Writer returns a FileWriter which will store the content written to it + // at the location designated by "path" after the call to Commit. + Writer(ctx context.Context, path string, append bool) (FileWriter, error) // Stat retrieves the FileInfo for the given path, including the current // size in bytes and the creation time. @@ -83,6 +82,25 @@ type StorageDriver interface { URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) } +// FileWriter provides an abstraction for an opened writable file-like object in +// the storage backend. The FileWriter must flush all content written to it on +// the call to Close, but is only required to make its content readable on a +// call to Commit. +type FileWriter interface { + io.WriteCloser + + // Size returns the number of bytes written to this FileWriter. + Size() int64 + + // Cancel removes any written content from this FileWriter. + Cancel() error + + // Commit flushes all content written to this FileWriter and makes it + // available for future calls to StorageDriver.GetContent and + // StorageDriver.Reader. + Commit() error +} + // PathRegexp is the regular expression which each file path must match. A // file path is absolute, beginning with a slash and containing a positive // number of path components separated by slashes, where each component is diff --git a/docs/storage/driver/testsuites/testsuites.go b/docs/storage/driver/testsuites/testsuites.go index 3ff4e1e69..48d90ed8f 100644 --- a/docs/storage/driver/testsuites/testsuites.go +++ b/docs/storage/driver/testsuites/testsuites.go @@ -282,11 +282,19 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) { var fileSize int64 = 5 * 1024 * 1024 * 1024 contents := newRandReader(fileSize) - written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, io.TeeReader(contents, checksum)) + + writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) + c.Assert(err, check.IsNil) + written, err := io.Copy(writer, io.TeeReader(contents, checksum)) c.Assert(err, check.IsNil) c.Assert(written, check.Equals, fileSize) - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + err = writer.Commit() + c.Assert(err, check.IsNil) + err = writer.Close() + c.Assert(err, check.IsNil) + + reader, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() @@ -296,9 +304,9 @@ func (suite *DriverSuite) TestWriteReadLargeStreams(c *check.C) { c.Assert(writtenChecksum.Sum(nil), check.DeepEquals, checksum.Sum(nil)) } -// TestReadStreamWithOffset tests that the appropriate data is streamed when +// TestReaderWithOffset tests that the appropriate data is streamed when // reading with a given offset. -func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { +func (suite *DriverSuite) TestReaderWithOffset(c *check.C) { filename := randomPath(32) defer suite.deletePath(c, firstPart(filename)) @@ -311,7 +319,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { err := suite.StorageDriver.PutContent(suite.ctx, filename, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) c.Assert(err, check.IsNil) - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + reader, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() @@ -320,7 +328,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, append(append(contentsChunk1, contentsChunk2...), contentsChunk3...)) - reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize) + reader, err = suite.StorageDriver.Reader(suite.ctx, filename, chunkSize) c.Assert(err, check.IsNil) defer reader.Close() @@ -329,7 +337,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, append(contentsChunk2, contentsChunk3...)) - reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*2) + reader, err = suite.StorageDriver.Reader(suite.ctx, filename, chunkSize*2) c.Assert(err, check.IsNil) defer reader.Close() @@ -338,7 +346,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(readContents, check.DeepEquals, contentsChunk3) // Ensure we get invalid offest for negative offsets. - reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, -1) + reader, err = suite.StorageDriver.Reader(suite.ctx, filename, -1) c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) @@ -347,7 +355,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { // Read past the end of the content and make sure we get a reader that // returns 0 bytes and io.EOF - reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3) + reader, err = suite.StorageDriver.Reader(suite.ctx, filename, chunkSize*3) c.Assert(err, check.IsNil) defer reader.Close() @@ -357,7 +365,7 @@ func (suite *DriverSuite) TestReadStreamWithOffset(c *check.C) { c.Assert(n, check.Equals, 0) // Check the N-1 boundary condition, ensuring we get 1 byte then io.EOF. - reader, err = suite.StorageDriver.ReadStream(suite.ctx, filename, chunkSize*3-1) + reader, err = suite.StorageDriver.Reader(suite.ctx, filename, chunkSize*3-1) c.Assert(err, check.IsNil) defer reader.Close() @@ -395,78 +403,51 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) contentsChunk1 := randomContents(chunkSize) contentsChunk2 := randomContents(chunkSize) contentsChunk3 := randomContents(chunkSize) - contentsChunk4 := randomContents(chunkSize) - zeroChunk := make([]byte, int64(chunkSize)) fullContents := append(append(contentsChunk1, contentsChunk2...), contentsChunk3...) - nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contentsChunk1)) + writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) + c.Assert(err, check.IsNil) + nn, err := io.Copy(writer, bytes.NewReader(contentsChunk1)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contentsChunk1))) - fi, err := suite.StorageDriver.Stat(suite.ctx, filename) - c.Assert(err, check.IsNil) - c.Assert(fi, check.NotNil) - c.Assert(fi.Size(), check.Equals, int64(len(contentsChunk1))) + curSize := writer.Size() + c.Assert(curSize, check.Equals, int64(len(contentsChunk1))) - nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(contentsChunk2)) + err = writer.Close() + c.Assert(err, check.IsNil) + + writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true) + c.Assert(err, check.IsNil) + c.Assert(writer.Size(), check.Equals, curSize) + + nn, err = io.Copy(writer, bytes.NewReader(contentsChunk2)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contentsChunk2))) - fi, err = suite.StorageDriver.Stat(suite.ctx, filename) - c.Assert(err, check.IsNil) - c.Assert(fi, check.NotNil) - c.Assert(fi.Size(), check.Equals, 2*chunkSize) + curSize = writer.Size() + c.Assert(curSize, check.Equals, 2*chunkSize) - // Test re-writing the last chunk - nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size()-chunkSize, bytes.NewReader(contentsChunk2)) + err = writer.Close() c.Assert(err, check.IsNil) - c.Assert(nn, check.Equals, int64(len(contentsChunk2))) - fi, err = suite.StorageDriver.Stat(suite.ctx, filename) + writer, err = suite.StorageDriver.Writer(suite.ctx, filename, true) c.Assert(err, check.IsNil) - c.Assert(fi, check.NotNil) - c.Assert(fi.Size(), check.Equals, 2*chunkSize) + c.Assert(writer.Size(), check.Equals, curSize) - nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, fi.Size(), bytes.NewReader(fullContents[fi.Size():])) + nn, err = io.Copy(writer, bytes.NewReader(fullContents[curSize:])) + c.Assert(err, check.IsNil) + c.Assert(nn, check.Equals, int64(len(fullContents[curSize:]))) + + err = writer.Commit() + c.Assert(err, check.IsNil) + err = writer.Close() c.Assert(err, check.IsNil) - c.Assert(nn, check.Equals, int64(len(fullContents[fi.Size():]))) received, err := suite.StorageDriver.GetContent(suite.ctx, filename) c.Assert(err, check.IsNil) c.Assert(received, check.DeepEquals, fullContents) - - // Writing past size of file extends file (no offset error). We would like - // to write chunk 4 one chunk length past chunk 3. It should be successful - // and the resulting file will be 5 chunks long, with a chunk of all - // zeros. - - fullContents = append(fullContents, zeroChunk...) - fullContents = append(fullContents, contentsChunk4...) - - nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, int64(len(fullContents))-chunkSize, bytes.NewReader(contentsChunk4)) - c.Assert(err, check.IsNil) - c.Assert(nn, check.Equals, chunkSize) - - fi, err = suite.StorageDriver.Stat(suite.ctx, filename) - c.Assert(err, check.IsNil) - c.Assert(fi, check.NotNil) - c.Assert(fi.Size(), check.Equals, int64(len(fullContents))) - - received, err = suite.StorageDriver.GetContent(suite.ctx, filename) - c.Assert(err, check.IsNil) - c.Assert(len(received), check.Equals, len(fullContents)) - c.Assert(received[chunkSize*3:chunkSize*4], check.DeepEquals, zeroChunk) - c.Assert(received[chunkSize*4:chunkSize*5], check.DeepEquals, contentsChunk4) - c.Assert(received, check.DeepEquals, fullContents) - - // Ensure that negative offsets return correct error. - nn, err = suite.StorageDriver.WriteStream(suite.ctx, filename, -1, bytes.NewReader(zeroChunk)) - c.Assert(err, check.NotNil) - c.Assert(err, check.FitsTypeOf, storagedriver.InvalidOffsetError{}) - c.Assert(err.(storagedriver.InvalidOffsetError).Path, check.Equals, filename) - c.Assert(err.(storagedriver.InvalidOffsetError).Offset, check.Equals, int64(-1)) - c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) } // TestReadNonexistentStream tests that reading a stream for a nonexistent path @@ -474,12 +455,12 @@ func (suite *DriverSuite) testContinueStreamAppend(c *check.C, chunkSize int64) func (suite *DriverSuite) TestReadNonexistentStream(c *check.C) { filename := randomPath(32) - _, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + _, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) - _, err = suite.StorageDriver.ReadStream(suite.ctx, filename, 64) + _, err = suite.StorageDriver.Reader(suite.ctx, filename, 64) c.Assert(err, check.NotNil) c.Assert(err, check.FitsTypeOf, storagedriver.PathNotFoundError{}) c.Assert(strings.Contains(err.Error(), suite.Name()), check.Equals, true) @@ -800,7 +781,7 @@ func (suite *DriverSuite) TestStatCall(c *check.C) { // TestPutContentMultipleTimes checks that if storage driver can overwrite the content // in the subsequent puts. Validates that PutContent does not have to work -// with an offset like WriteStream does and overwrites the file entirely +// with an offset like Writer does and overwrites the file entirely // rather than writing the data to the [0,len(data)) of the file. func (suite *DriverSuite) TestPutContentMultipleTimes(c *check.C) { filename := randomPath(32) @@ -842,7 +823,7 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) { readContents := func() { defer wg.Done() offset := rand.Int63n(int64(len(contents))) - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset) + reader, err := suite.StorageDriver.Reader(suite.ctx, filename, offset) c.Assert(err, check.IsNil) readContents, err := ioutil.ReadAll(reader) @@ -858,7 +839,7 @@ func (suite *DriverSuite) TestConcurrentStreamReads(c *check.C) { } // TestConcurrentFileStreams checks that multiple *os.File objects can be passed -// in to WriteStream concurrently without hanging. +// in to Writer concurrently without hanging. func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { numStreams := 32 @@ -882,53 +863,54 @@ func (suite *DriverSuite) TestConcurrentFileStreams(c *check.C) { wg.Wait() } +// TODO (brianbland): evaluate the relevancy of this test // TestEventualConsistency checks that if stat says that a file is a certain size, then // you can freely read from the file (this is the only guarantee that the driver needs to provide) -func (suite *DriverSuite) TestEventualConsistency(c *check.C) { - if testing.Short() { - c.Skip("Skipping test in short mode") - } - - filename := randomPath(32) - defer suite.deletePath(c, firstPart(filename)) - - var offset int64 - var misswrites int - var chunkSize int64 = 32 - - for i := 0; i < 1024; i++ { - contents := randomContents(chunkSize) - read, err := suite.StorageDriver.WriteStream(suite.ctx, filename, offset, bytes.NewReader(contents)) - c.Assert(err, check.IsNil) - - fi, err := suite.StorageDriver.Stat(suite.ctx, filename) - c.Assert(err, check.IsNil) - - // We are most concerned with being able to read data as soon as Stat declares - // it is uploaded. This is the strongest guarantee that some drivers (that guarantee - // at best eventual consistency) absolutely need to provide. - if fi.Size() == offset+chunkSize { - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, offset) - c.Assert(err, check.IsNil) - - readContents, err := ioutil.ReadAll(reader) - c.Assert(err, check.IsNil) - - c.Assert(readContents, check.DeepEquals, contents) - - reader.Close() - offset += read - } else { - misswrites++ - } - } - - if misswrites > 0 { - c.Log("There were " + string(misswrites) + " occurrences of a write not being instantly available.") - } - - c.Assert(misswrites, check.Not(check.Equals), 1024) -} +// func (suite *DriverSuite) TestEventualConsistency(c *check.C) { +// if testing.Short() { +// c.Skip("Skipping test in short mode") +// } +// +// filename := randomPath(32) +// defer suite.deletePath(c, firstPart(filename)) +// +// var offset int64 +// var misswrites int +// var chunkSize int64 = 32 +// +// for i := 0; i < 1024; i++ { +// contents := randomContents(chunkSize) +// read, err := suite.StorageDriver.Writer(suite.ctx, filename, offset, bytes.NewReader(contents)) +// c.Assert(err, check.IsNil) +// +// fi, err := suite.StorageDriver.Stat(suite.ctx, filename) +// c.Assert(err, check.IsNil) +// +// // We are most concerned with being able to read data as soon as Stat declares +// // it is uploaded. This is the strongest guarantee that some drivers (that guarantee +// // at best eventual consistency) absolutely need to provide. +// if fi.Size() == offset+chunkSize { +// reader, err := suite.StorageDriver.Reader(suite.ctx, filename, offset) +// c.Assert(err, check.IsNil) +// +// readContents, err := ioutil.ReadAll(reader) +// c.Assert(err, check.IsNil) +// +// c.Assert(readContents, check.DeepEquals, contents) +// +// reader.Close() +// offset += read +// } else { +// misswrites++ +// } +// } +// +// if misswrites > 0 { +// c.Log("There were " + string(misswrites) + " occurrences of a write not being instantly available.") +// } +// +// c.Assert(misswrites, check.Not(check.Equals), 1024) +// } // BenchmarkPutGetEmptyFiles benchmarks PutContent/GetContent for 0B files func (suite *DriverSuite) BenchmarkPutGetEmptyFiles(c *check.C) { @@ -968,22 +950,22 @@ func (suite *DriverSuite) benchmarkPutGetFiles(c *check.C, size int64) { } } -// BenchmarkStreamEmptyFiles benchmarks WriteStream/ReadStream for 0B files +// BenchmarkStreamEmptyFiles benchmarks Writer/Reader for 0B files func (suite *DriverSuite) BenchmarkStreamEmptyFiles(c *check.C) { suite.benchmarkStreamFiles(c, 0) } -// BenchmarkStream1KBFiles benchmarks WriteStream/ReadStream for 1KB files +// BenchmarkStream1KBFiles benchmarks Writer/Reader for 1KB files func (suite *DriverSuite) BenchmarkStream1KBFiles(c *check.C) { suite.benchmarkStreamFiles(c, 1024) } -// BenchmarkStream1MBFiles benchmarks WriteStream/ReadStream for 1MB files +// BenchmarkStream1MBFiles benchmarks Writer/Reader for 1MB files func (suite *DriverSuite) BenchmarkStream1MBFiles(c *check.C) { suite.benchmarkStreamFiles(c, 1024*1024) } -// BenchmarkStream1GBFiles benchmarks WriteStream/ReadStream for 1GB files +// BenchmarkStream1GBFiles benchmarks Writer/Reader for 1GB files func (suite *DriverSuite) BenchmarkStream1GBFiles(c *check.C) { suite.benchmarkStreamFiles(c, 1024*1024*1024) } @@ -998,11 +980,18 @@ func (suite *DriverSuite) benchmarkStreamFiles(c *check.C, size int64) { for i := 0; i < c.N; i++ { filename := path.Join(parentDir, randomPath(32)) - written, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(randomContents(size))) + writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) + c.Assert(err, check.IsNil) + written, err := io.Copy(writer, bytes.NewReader(randomContents(size))) c.Assert(err, check.IsNil) c.Assert(written, check.Equals, size) - rc, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + err = writer.Commit() + c.Assert(err, check.IsNil) + err = writer.Close() + c.Assert(err, check.IsNil) + + rc, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.IsNil) rc.Close() } @@ -1083,11 +1072,18 @@ func (suite *DriverSuite) testFileStreams(c *check.C, size int64) { tf.Sync() tf.Seek(0, os.SEEK_SET) - nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, tf) + writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) + c.Assert(err, check.IsNil) + nn, err := io.Copy(writer, tf) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, size) - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + err = writer.Commit() + c.Assert(err, check.IsNil) + err = writer.Close() + c.Assert(err, check.IsNil) + + reader, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() @@ -1112,11 +1108,18 @@ func (suite *DriverSuite) writeReadCompare(c *check.C, filename string, contents func (suite *DriverSuite) writeReadCompareStreams(c *check.C, filename string, contents []byte) { defer suite.deletePath(c, firstPart(filename)) - nn, err := suite.StorageDriver.WriteStream(suite.ctx, filename, 0, bytes.NewReader(contents)) + writer, err := suite.StorageDriver.Writer(suite.ctx, filename, false) + c.Assert(err, check.IsNil) + nn, err := io.Copy(writer, bytes.NewReader(contents)) c.Assert(err, check.IsNil) c.Assert(nn, check.Equals, int64(len(contents))) - reader, err := suite.StorageDriver.ReadStream(suite.ctx, filename, 0) + err = writer.Commit() + c.Assert(err, check.IsNil) + err = writer.Close() + c.Assert(err, check.IsNil) + + reader, err := suite.StorageDriver.Reader(suite.ctx, filename, 0) c.Assert(err, check.IsNil) defer reader.Close() diff --git a/docs/storage/filereader.go b/docs/storage/filereader.go index b3a5f5203..3b06c8179 100644 --- a/docs/storage/filereader.go +++ b/docs/storage/filereader.go @@ -119,7 +119,7 @@ func (fr *fileReader) reader() (io.Reader, error) { } // If we don't have a reader, open one up. - rc, err := fr.driver.ReadStream(fr.ctx, fr.path, fr.offset) + rc, err := fr.driver.Reader(fr.ctx, fr.path, fr.offset) if err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: diff --git a/docs/storage/filewriter.go b/docs/storage/filewriter.go deleted file mode 100644 index 7c68f3469..000000000 --- a/docs/storage/filewriter.go +++ /dev/null @@ -1,135 +0,0 @@ -package storage - -import ( - "bytes" - "fmt" - "io" - "os" - - "github.com/docker/distribution/context" - storagedriver "github.com/docker/distribution/registry/storage/driver" -) - -// fileWriter implements a remote file writer backed by a storage driver. -type fileWriter struct { - driver storagedriver.StorageDriver - - ctx context.Context - - // identifying fields - path string - - // mutable fields - size int64 // size of the file, aka the current end - offset int64 // offset is the current write offset - err error // terminal error, if set, reader is closed -} - -// fileWriterInterface makes the desired io compliant interface that the -// filewriter should implement. -type fileWriterInterface interface { - io.WriteSeeker - io.ReaderFrom - io.Closer -} - -var _ fileWriterInterface = &fileWriter{} - -// newFileWriter returns a prepared fileWriter for the driver and path. This -// could be considered similar to an "open" call on a regular filesystem. -func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*fileWriter, error) { - fw := fileWriter{ - driver: driver, - path: path, - ctx: ctx, - } - - if fi, err := driver.Stat(ctx, path); err != nil { - switch err := err.(type) { - case storagedriver.PathNotFoundError: - // ignore, offset is zero - default: - return nil, err - } - } else { - if fi.IsDir() { - return nil, fmt.Errorf("cannot write to a directory") - } - - fw.size = fi.Size() - } - - return &fw, nil -} - -// Write writes the buffer p at the current write offset. -func (fw *fileWriter) Write(p []byte) (n int, err error) { - nn, err := fw.ReadFrom(bytes.NewReader(p)) - return int(nn), err -} - -// ReadFrom reads reader r until io.EOF writing the contents at the current -// offset. -func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) { - if fw.err != nil { - return 0, fw.err - } - - nn, err := fw.driver.WriteStream(fw.ctx, fw.path, fw.offset, r) - - // We should forward the offset, whether or not there was an error. - // Basically, we keep the filewriter in sync with the reader's head. If an - // error is encountered, the whole thing should be retried but we proceed - // from an expected offset, even if the data didn't make it to the - // backend. - fw.offset += nn - - if fw.offset > fw.size { - fw.size = fw.offset - } - - return nn, err -} - -// Seek moves the write position do the requested offest based on the whence -// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET. -func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { - if fw.err != nil { - return 0, fw.err - } - - var err error - newOffset := fw.offset - - switch whence { - case os.SEEK_CUR: - newOffset += int64(offset) - case os.SEEK_END: - newOffset = fw.size + int64(offset) - case os.SEEK_SET: - newOffset = int64(offset) - } - - if newOffset < 0 { - err = fmt.Errorf("cannot seek to negative position") - } else { - // No problems, set the offset. - fw.offset = newOffset - } - - return fw.offset, err -} - -// Close closes the fileWriter for writing. -// Calling it once is valid and correct and it will -// return a nil error. Calling it subsequent times will -// detect that fw.err has been set and will return the error. -func (fw *fileWriter) Close() error { - if fw.err != nil { - return fw.err - } - - fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) - - return nil -} diff --git a/docs/storage/filewriter_test.go b/docs/storage/filewriter_test.go deleted file mode 100644 index d6782cd46..000000000 --- a/docs/storage/filewriter_test.go +++ /dev/null @@ -1,226 +0,0 @@ -package storage - -import ( - "bytes" - "crypto/rand" - "io" - "os" - "testing" - - "github.com/docker/distribution/context" - "github.com/docker/distribution/digest" - storagedriver "github.com/docker/distribution/registry/storage/driver" - "github.com/docker/distribution/registry/storage/driver/inmemory" -) - -// TestSimpleWrite takes the fileWriter through common write operations -// ensuring data integrity. -func TestSimpleWrite(t *testing.T) { - content := make([]byte, 1<<20) - n, err := rand.Read(content) - if err != nil { - t.Fatalf("unexpected error building random data: %v", err) - } - - if n != len(content) { - t.Fatalf("random read did't fill buffer") - } - - dgst, err := digest.FromReader(bytes.NewReader(content)) - if err != nil { - t.Fatalf("unexpected error digesting random content: %v", err) - } - - driver := inmemory.New() - path := "/random" - ctx := context.Background() - - fw, err := newFileWriter(ctx, driver, path) - if err != nil { - t.Fatalf("unexpected error creating fileWriter: %v", err) - } - defer fw.Close() - - n, err = fw.Write(content) - if err != nil { - t.Fatalf("unexpected error writing content: %v", err) - } - - if n != len(content) { - t.Fatalf("unexpected write length: %d != %d", n, len(content)) - } - - fr, err := newFileReader(ctx, driver, path, int64(len(content))) - if err != nil { - t.Fatalf("unexpected error creating fileReader: %v", err) - } - defer fr.Close() - - verifier, err := digest.NewDigestVerifier(dgst) - if err != nil { - t.Fatalf("unexpected error getting digest verifier: %s", err) - } - - io.Copy(verifier, fr) - - if !verifier.Verified() { - t.Fatalf("unable to verify write data") - } - - // Check the seek position is equal to the content length - end, err := fw.Seek(0, os.SEEK_END) - if err != nil { - t.Fatalf("unexpected error seeking: %v", err) - } - - if end != int64(len(content)) { - t.Fatalf("write did not advance offset: %d != %d", end, len(content)) - } - - // Double the content - doubled := append(content, content...) - doubledgst, err := digest.FromReader(bytes.NewReader(doubled)) - if err != nil { - t.Fatalf("unexpected error digesting doubled content: %v", err) - } - - nn, err := fw.ReadFrom(bytes.NewReader(content)) - if err != nil { - t.Fatalf("unexpected error doubling content: %v", err) - } - - if nn != int64(len(content)) { - t.Fatalf("writeat was short: %d != %d", n, len(content)) - } - - fr, err = newFileReader(ctx, driver, path, int64(len(doubled))) - if err != nil { - t.Fatalf("unexpected error creating fileReader: %v", err) - } - defer fr.Close() - - verifier, err = digest.NewDigestVerifier(doubledgst) - if err != nil { - t.Fatalf("unexpected error getting digest verifier: %s", err) - } - - io.Copy(verifier, fr) - - if !verifier.Verified() { - t.Fatalf("unable to verify write data") - } - - // Check that Write updated the offset. - end, err = fw.Seek(0, os.SEEK_END) - if err != nil { - t.Fatalf("unexpected error seeking: %v", err) - } - - if end != int64(len(doubled)) { - t.Fatalf("write did not advance offset: %d != %d", end, len(doubled)) - } - - // Now, we copy from one path to another, running the data through the - // fileReader to fileWriter, rather than the driver.Move command to ensure - // everything is working correctly. - fr, err = newFileReader(ctx, driver, path, int64(len(doubled))) - if err != nil { - t.Fatalf("unexpected error creating fileReader: %v", err) - } - defer fr.Close() - - fw, err = newFileWriter(ctx, driver, "/copied") - if err != nil { - t.Fatalf("unexpected error creating fileWriter: %v", err) - } - defer fw.Close() - - nn, err = io.Copy(fw, fr) - if err != nil { - t.Fatalf("unexpected error copying data: %v", err) - } - - if nn != int64(len(doubled)) { - t.Fatalf("unexpected copy length: %d != %d", nn, len(doubled)) - } - - fr, err = newFileReader(ctx, driver, "/copied", int64(len(doubled))) - if err != nil { - t.Fatalf("unexpected error creating fileReader: %v", err) - } - defer fr.Close() - - verifier, err = digest.NewDigestVerifier(doubledgst) - if err != nil { - t.Fatalf("unexpected error getting digest verifier: %s", err) - } - - io.Copy(verifier, fr) - - if !verifier.Verified() { - t.Fatalf("unable to verify write data") - } -} - -func BenchmarkFileWriter(b *testing.B) { - b.StopTimer() // not sure how long setup above will take - for i := 0; i < b.N; i++ { - // Start basic fileWriter initialization - fw := fileWriter{ - driver: inmemory.New(), - path: "/random", - } - ctx := context.Background() - if fi, err := fw.driver.Stat(ctx, fw.path); err != nil { - switch err := err.(type) { - case storagedriver.PathNotFoundError: - // ignore, offset is zero - default: - b.Fatalf("Failed to initialize fileWriter: %v", err.Error()) - } - } else { - if fi.IsDir() { - b.Fatalf("Cannot write to a directory") - } - - fw.size = fi.Size() - } - - randomBytes := make([]byte, 1<<20) - _, err := rand.Read(randomBytes) - if err != nil { - b.Fatalf("unexpected error building random data: %v", err) - } - // End basic file writer initialization - - b.StartTimer() - for j := 0; j < 100; j++ { - fw.Write(randomBytes) - } - b.StopTimer() - } -} - -func BenchmarkfileWriter(b *testing.B) { - b.StopTimer() // not sure how long setup above will take - ctx := context.Background() - for i := 0; i < b.N; i++ { - bfw, err := newFileWriter(ctx, inmemory.New(), "/random") - - if err != nil { - b.Fatalf("Failed to initialize fileWriter: %v", err.Error()) - } - - randomBytes := make([]byte, 1<<20) - _, err = rand.Read(randomBytes) - if err != nil { - b.Fatalf("unexpected error building random data: %v", err) - } - - b.StartTimer() - for j := 0; j < 100; j++ { - bfw.Write(randomBytes) - } - b.StopTimer() - } -} diff --git a/docs/storage/linkedblobstore.go b/docs/storage/linkedblobstore.go index 76a1c29dd..e06f95406 100644 --- a/docs/storage/linkedblobstore.go +++ b/docs/storage/linkedblobstore.go @@ -179,7 +179,7 @@ func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution. return nil, err } - return lbs.newBlobUpload(ctx, uuid, path, startedAt) + return lbs.newBlobUpload(ctx, uuid, path, startedAt, false) } func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { @@ -218,7 +218,7 @@ func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution return nil, err } - return lbs.newBlobUpload(ctx, id, path, startedAt) + return lbs.newBlobUpload(ctx, id, path, startedAt, true) } func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { @@ -312,18 +312,21 @@ func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo reference.Name } // newBlobUpload allocates a new upload controller with the given state. -func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) { - fw, err := newFileWriter(ctx, lbs.driver, path) +func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time, append bool) (distribution.BlobWriter, error) { + fw, err := lbs.driver.Writer(ctx, path, append) if err != nil { return nil, err } bw := &blobWriter{ - blobStore: lbs, - id: uuid, - startedAt: startedAt, - digester: digest.Canonical.New(), - fileWriter: *fw, + ctx: ctx, + blobStore: lbs, + id: uuid, + startedAt: startedAt, + digester: digest.Canonical.New(), + fileWriter: fw, + driver: lbs.driver, + path: path, resumableDigestEnabled: lbs.resumableDigestEnabled, }