diff --git a/registry/storage/blob_test.go b/registry/storage/blob_test.go index 4a56784e2..701a14ed2 100644 --- a/registry/storage/blob_test.go +++ b/registry/storage/blob_test.go @@ -18,6 +18,39 @@ import ( "github.com/docker/distribution/testutil" ) +// TestWriteSeek tests that the current file size can be +// obtained using Seek +func TestWriteSeek(t *testing.T) { + ctx := context.Background() + imageName := "foo/bar" + driver := inmemory.New() + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect) + if err != nil { + t.Fatalf("error creating registry: %v", err) + } + repository, err := registry.Repository(ctx, imageName) + if err != nil { + t.Fatalf("unexpected error getting repo: %v", err) + } + bs := repository.Blobs(ctx) + + blobUpload, err := bs.Create(ctx) + + if err != nil { + t.Fatalf("unexpected error starting layer upload: %s", err) + } + contents := []byte{1, 2, 3} + blobUpload.Write(contents) + offset, err := blobUpload.Seek(0, os.SEEK_CUR) + if err != nil { + t.Fatalf("unexpected error in blobUpload.Seek: %s", err) + } + if offset != int64(len(contents)) { + t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents)) + } + +} + // TestSimpleBlobUpload covers the blob upload process, exercising common // error paths that might be seen during an upload. func TestSimpleBlobUpload(t *testing.T) { diff --git a/registry/storage/blobwriter.go b/registry/storage/blobwriter.go index 2406c95a9..f2ca7388d 100644 --- a/registry/storage/blobwriter.go +++ b/registry/storage/blobwriter.go @@ -30,7 +30,7 @@ type blobWriter struct { // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy // LayerUpload Interface - bufferedFileWriter + fileWriter resumableDigestEnabled bool } @@ -51,7 +51,7 @@ func (bw *blobWriter) StartedAt() time.Time { func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { context.GetLogger(ctx).Debug("(*blobWriter).Commit") - if err := bw.bufferedFileWriter.Close(); err != nil { + if err := bw.fileWriter.Close(); err != nil { return distribution.Descriptor{}, err } @@ -100,7 +100,7 @@ func (bw *blobWriter) Write(p []byte) (int, error) { return 0, err } - n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p) + n, err := io.MultiWriter(&bw.fileWriter, bw.digester.Hash()).Write(p) bw.written += int64(n) return n, err @@ -114,7 +114,7 @@ func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) { return 0, err } - nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash())) + nn, err := bw.fileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash())) bw.written += nn return nn, err @@ -129,7 +129,7 @@ func (bw *blobWriter) Close() error { return err } - return bw.bufferedFileWriter.Close() + return bw.fileWriter.Close() } // validateBlob checks the data against the digest, returning an error if it @@ -149,7 +149,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri } // Stat the on disk file - if fi, err := bw.bufferedFileWriter.driver.Stat(ctx, bw.path); err != nil { + if fi, err := bw.fileWriter.driver.Stat(ctx, bw.path); err != nil { switch err := err.(type) { case storagedriver.PathNotFoundError: // NOTE(stevvooe): We really don't care if the file is @@ -223,7 +223,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri } // Read the file from the backend driver and validate it. - fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Size) + fr, err := newFileReader(ctx, bw.fileWriter.driver, bw.path, desc.Size) if err != nil { return distribution.Descriptor{}, err } @@ -357,7 +357,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) { // todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4 try := 1 for try <= 5 { - _, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path) + _, err := bw.fileWriter.driver.Stat(bw.ctx, bw.path) if err == nil { break } @@ -371,7 +371,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) { } } - readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0) + readCloser, err := bw.fileWriter.driver.ReadStream(bw.ctx, bw.path, 0) if err != nil { return nil, err } diff --git a/registry/storage/filewriter.go b/registry/storage/filewriter.go index 529fa6736..7c68f3469 100644 --- a/registry/storage/filewriter.go +++ b/registry/storage/filewriter.go @@ -1,7 +1,6 @@ package storage import ( - "bufio" "bytes" "fmt" "io" @@ -11,10 +10,6 @@ import ( storagedriver "github.com/docker/distribution/registry/storage/driver" ) -const ( - fileWriterBufferSize = 5 << 20 -) - // fileWriter implements a remote file writer backed by a storage driver. type fileWriter struct { driver storagedriver.StorageDriver @@ -30,11 +25,6 @@ type fileWriter struct { err error // terminal error, if set, reader is closed } -type bufferedFileWriter struct { - fileWriter - bw *bufio.Writer -} - // fileWriterInterface makes the desired io compliant interface that the // filewriter should implement. type fileWriterInterface interface { @@ -47,7 +37,7 @@ var _ fileWriterInterface = &fileWriter{} // newFileWriter returns a prepared fileWriter for the driver and path. This // could be considered similar to an "open" call on a regular filesystem. -func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) { +func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*fileWriter, error) { fw := fileWriter{ driver: driver, path: path, @@ -69,42 +59,7 @@ func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path fw.size = fi.Size() } - buffered := bufferedFileWriter{ - fileWriter: fw, - } - buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize) - - return &buffered, nil -} - -// wraps the fileWriter.Write method to buffer small writes -func (bfw *bufferedFileWriter) Write(p []byte) (int, error) { - return bfw.bw.Write(p) -} - -// wraps fileWriter.Close to ensure the buffer is flushed -// before we close the writer. -func (bfw *bufferedFileWriter) Close() (err error) { - if err = bfw.Flush(); err != nil { - return err - } - err = bfw.fileWriter.Close() - return err -} - -// wraps fileWriter.Seek to ensure offset is handled -// correctly in respect to pending data in the buffer -func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) { - if err := bfw.Flush(); err != nil { - return 0, err - } - return bfw.fileWriter.Seek(offset, whence) -} - -// wraps bufio.Writer.Flush to allow intermediate flushes -// of the bufferedFileWriter -func (bfw *bufferedFileWriter) Flush() error { - return bfw.bw.Flush() + return &fw, nil } // Write writes the buffer p at the current write offset. diff --git a/registry/storage/filewriter_test.go b/registry/storage/filewriter_test.go index 858b03272..d6782cd46 100644 --- a/registry/storage/filewriter_test.go +++ b/registry/storage/filewriter_test.go @@ -45,7 +45,6 @@ func TestSimpleWrite(t *testing.T) { if err != nil { t.Fatalf("unexpected error writing content: %v", err) } - fw.Flush() if n != len(content) { t.Fatalf("unexpected write length: %d != %d", n, len(content)) @@ -163,41 +162,6 @@ func TestSimpleWrite(t *testing.T) { } } -func TestBufferedFileWriter(t *testing.T) { - ctx := context.Background() - writer, err := newFileWriter(ctx, inmemory.New(), "/random") - - if err != nil { - t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) - } - - // write one byte and ensure the offset hasn't been incremented. - // offset will only get incremented when the buffer gets flushed - short := []byte{byte(1)} - - writer.Write(short) - - if writer.offset > 0 { - t.Fatalf("WriteStream called prematurely") - } - - // write enough data to cause the buffer to flush and confirm - // the offset has been incremented - long := make([]byte, fileWriterBufferSize) - _, err = rand.Read(long) - if err != nil { - t.Fatalf("unexpected error building random data: %v", err) - } - for i := range long { - long[i] = byte(i) - } - writer.Write(long) - writer.Close() - if writer.offset != (fileWriterBufferSize + 1) { - t.Fatalf("WriteStream not called when buffer capacity reached") - } -} - func BenchmarkFileWriter(b *testing.B) { b.StopTimer() // not sure how long setup above will take for i := 0; i < b.N; i++ { @@ -237,14 +201,14 @@ func BenchmarkFileWriter(b *testing.B) { } } -func BenchmarkBufferedFileWriter(b *testing.B) { +func BenchmarkfileWriter(b *testing.B) { b.StopTimer() // not sure how long setup above will take ctx := context.Background() for i := 0; i < b.N; i++ { bfw, err := newFileWriter(ctx, inmemory.New(), "/random") if err != nil { - b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) + b.Fatalf("Failed to initialize fileWriter: %v", err.Error()) } randomBytes := make([]byte, 1<<20) diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go index 963d59d58..3e6f9c2d2 100644 --- a/registry/storage/linkedblobstore.go +++ b/registry/storage/linkedblobstore.go @@ -270,7 +270,7 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string id: uuid, startedAt: startedAt, digester: digest.Canonical.New(), - bufferedFileWriter: *fw, + fileWriter: *fw, resumableDigestEnabled: lbs.resumableDigestEnabled, }