forked from TrueCloudLab/distribution
Merge pull request #1350 from aibaars/storage-filewriter-pointer
Storage: remove bufferedFileWriter (dead code)
This commit is contained in:
commit
579981b979
5 changed files with 47 additions and 95 deletions
|
@ -18,6 +18,39 @@ import (
|
||||||
"github.com/docker/distribution/testutil"
|
"github.com/docker/distribution/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestWriteSeek tests that the current file size can be
|
||||||
|
// obtained using Seek
|
||||||
|
func TestWriteSeek(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
imageName := "foo/bar"
|
||||||
|
driver := inmemory.New()
|
||||||
|
registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider()), EnableDelete, EnableRedirect)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error creating registry: %v", err)
|
||||||
|
}
|
||||||
|
repository, err := registry.Repository(ctx, imageName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error getting repo: %v", err)
|
||||||
|
}
|
||||||
|
bs := repository.Blobs(ctx)
|
||||||
|
|
||||||
|
blobUpload, err := bs.Create(ctx)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error starting layer upload: %s", err)
|
||||||
|
}
|
||||||
|
contents := []byte{1, 2, 3}
|
||||||
|
blobUpload.Write(contents)
|
||||||
|
offset, err := blobUpload.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error in blobUpload.Seek: %s", err)
|
||||||
|
}
|
||||||
|
if offset != int64(len(contents)) {
|
||||||
|
t.Fatalf("unexpected value for blobUpload offset: %v != %v", offset, len(contents))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// TestSimpleBlobUpload covers the blob upload process, exercising common
|
// TestSimpleBlobUpload covers the blob upload process, exercising common
|
||||||
// error paths that might be seen during an upload.
|
// error paths that might be seen during an upload.
|
||||||
func TestSimpleBlobUpload(t *testing.T) {
|
func TestSimpleBlobUpload(t *testing.T) {
|
||||||
|
|
|
@ -30,7 +30,7 @@ type blobWriter struct {
|
||||||
|
|
||||||
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
// implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisfy
|
||||||
// LayerUpload Interface
|
// LayerUpload Interface
|
||||||
bufferedFileWriter
|
fileWriter
|
||||||
|
|
||||||
resumableDigestEnabled bool
|
resumableDigestEnabled bool
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,7 @@ func (bw *blobWriter) StartedAt() time.Time {
|
||||||
func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
func (bw *blobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
|
||||||
context.GetLogger(ctx).Debug("(*blobWriter).Commit")
|
context.GetLogger(ctx).Debug("(*blobWriter).Commit")
|
||||||
|
|
||||||
if err := bw.bufferedFileWriter.Close(); err != nil {
|
if err := bw.fileWriter.Close(); err != nil {
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ func (bw *blobWriter) Write(p []byte) (int, error) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := io.MultiWriter(&bw.bufferedFileWriter, bw.digester.Hash()).Write(p)
|
n, err := io.MultiWriter(&bw.fileWriter, bw.digester.Hash()).Write(p)
|
||||||
bw.written += int64(n)
|
bw.written += int64(n)
|
||||||
|
|
||||||
return n, err
|
return n, err
|
||||||
|
@ -114,7 +114,7 @@ func (bw *blobWriter) ReadFrom(r io.Reader) (n int64, err error) {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nn, err := bw.bufferedFileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
|
nn, err := bw.fileWriter.ReadFrom(io.TeeReader(r, bw.digester.Hash()))
|
||||||
bw.written += nn
|
bw.written += nn
|
||||||
|
|
||||||
return nn, err
|
return nn, err
|
||||||
|
@ -129,7 +129,7 @@ func (bw *blobWriter) Close() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bw.bufferedFileWriter.Close()
|
return bw.fileWriter.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// validateBlob checks the data against the digest, returning an error if it
|
// validateBlob checks the data against the digest, returning an error if it
|
||||||
|
@ -149,7 +149,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat the on disk file
|
// Stat the on disk file
|
||||||
if fi, err := bw.bufferedFileWriter.driver.Stat(ctx, bw.path); err != nil {
|
if fi, err := bw.fileWriter.driver.Stat(ctx, bw.path); err != nil {
|
||||||
switch err := err.(type) {
|
switch err := err.(type) {
|
||||||
case storagedriver.PathNotFoundError:
|
case storagedriver.PathNotFoundError:
|
||||||
// NOTE(stevvooe): We really don't care if the file is
|
// NOTE(stevvooe): We really don't care if the file is
|
||||||
|
@ -223,7 +223,7 @@ func (bw *blobWriter) validateBlob(ctx context.Context, desc distribution.Descri
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the file from the backend driver and validate it.
|
// Read the file from the backend driver and validate it.
|
||||||
fr, err := newFileReader(ctx, bw.bufferedFileWriter.driver, bw.path, desc.Size)
|
fr, err := newFileReader(ctx, bw.fileWriter.driver, bw.path, desc.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return distribution.Descriptor{}, err
|
return distribution.Descriptor{}, err
|
||||||
}
|
}
|
||||||
|
@ -357,7 +357,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) {
|
||||||
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
|
// todo(richardscothern): Change to exponential backoff, i=0.5, e=2, n=4
|
||||||
try := 1
|
try := 1
|
||||||
for try <= 5 {
|
for try <= 5 {
|
||||||
_, err := bw.bufferedFileWriter.driver.Stat(bw.ctx, bw.path)
|
_, err := bw.fileWriter.driver.Stat(bw.ctx, bw.path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -371,7 +371,7 @@ func (bw *blobWriter) Reader() (io.ReadCloser, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
readCloser, err := bw.bufferedFileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
|
readCloser, err := bw.fileWriter.driver.ReadStream(bw.ctx, bw.path, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -11,10 +10,6 @@ import (
|
||||||
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
storagedriver "github.com/docker/distribution/registry/storage/driver"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
fileWriterBufferSize = 5 << 20
|
|
||||||
)
|
|
||||||
|
|
||||||
// fileWriter implements a remote file writer backed by a storage driver.
|
// fileWriter implements a remote file writer backed by a storage driver.
|
||||||
type fileWriter struct {
|
type fileWriter struct {
|
||||||
driver storagedriver.StorageDriver
|
driver storagedriver.StorageDriver
|
||||||
|
@ -30,11 +25,6 @@ type fileWriter struct {
|
||||||
err error // terminal error, if set, reader is closed
|
err error // terminal error, if set, reader is closed
|
||||||
}
|
}
|
||||||
|
|
||||||
type bufferedFileWriter struct {
|
|
||||||
fileWriter
|
|
||||||
bw *bufio.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// fileWriterInterface makes the desired io compliant interface that the
|
// fileWriterInterface makes the desired io compliant interface that the
|
||||||
// filewriter should implement.
|
// filewriter should implement.
|
||||||
type fileWriterInterface interface {
|
type fileWriterInterface interface {
|
||||||
|
@ -47,7 +37,7 @@ var _ fileWriterInterface = &fileWriter{}
|
||||||
|
|
||||||
// newFileWriter returns a prepared fileWriter for the driver and path. This
|
// newFileWriter returns a prepared fileWriter for the driver and path. This
|
||||||
// could be considered similar to an "open" call on a regular filesystem.
|
// could be considered similar to an "open" call on a regular filesystem.
|
||||||
func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
|
func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*fileWriter, error) {
|
||||||
fw := fileWriter{
|
fw := fileWriter{
|
||||||
driver: driver,
|
driver: driver,
|
||||||
path: path,
|
path: path,
|
||||||
|
@ -69,42 +59,7 @@ func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path
|
||||||
fw.size = fi.Size()
|
fw.size = fi.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
buffered := bufferedFileWriter{
|
return &fw, nil
|
||||||
fileWriter: fw,
|
|
||||||
}
|
|
||||||
buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize)
|
|
||||||
|
|
||||||
return &buffered, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// wraps the fileWriter.Write method to buffer small writes
|
|
||||||
func (bfw *bufferedFileWriter) Write(p []byte) (int, error) {
|
|
||||||
return bfw.bw.Write(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wraps fileWriter.Close to ensure the buffer is flushed
|
|
||||||
// before we close the writer.
|
|
||||||
func (bfw *bufferedFileWriter) Close() (err error) {
|
|
||||||
if err = bfw.Flush(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = bfw.fileWriter.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// wraps fileWriter.Seek to ensure offset is handled
|
|
||||||
// correctly in respect to pending data in the buffer
|
|
||||||
func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) {
|
|
||||||
if err := bfw.Flush(); err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return bfw.fileWriter.Seek(offset, whence)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wraps bufio.Writer.Flush to allow intermediate flushes
|
|
||||||
// of the bufferedFileWriter
|
|
||||||
func (bfw *bufferedFileWriter) Flush() error {
|
|
||||||
return bfw.bw.Flush()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write writes the buffer p at the current write offset.
|
// Write writes the buffer p at the current write offset.
|
||||||
|
|
|
@ -45,7 +45,6 @@ func TestSimpleWrite(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error writing content: %v", err)
|
t.Fatalf("unexpected error writing content: %v", err)
|
||||||
}
|
}
|
||||||
fw.Flush()
|
|
||||||
|
|
||||||
if n != len(content) {
|
if n != len(content) {
|
||||||
t.Fatalf("unexpected write length: %d != %d", n, len(content))
|
t.Fatalf("unexpected write length: %d != %d", n, len(content))
|
||||||
|
@ -163,41 +162,6 @@ func TestSimpleWrite(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBufferedFileWriter(t *testing.T) {
|
|
||||||
ctx := context.Background()
|
|
||||||
writer, err := newFileWriter(ctx, inmemory.New(), "/random")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// write one byte and ensure the offset hasn't been incremented.
|
|
||||||
// offset will only get incremented when the buffer gets flushed
|
|
||||||
short := []byte{byte(1)}
|
|
||||||
|
|
||||||
writer.Write(short)
|
|
||||||
|
|
||||||
if writer.offset > 0 {
|
|
||||||
t.Fatalf("WriteStream called prematurely")
|
|
||||||
}
|
|
||||||
|
|
||||||
// write enough data to cause the buffer to flush and confirm
|
|
||||||
// the offset has been incremented
|
|
||||||
long := make([]byte, fileWriterBufferSize)
|
|
||||||
_, err = rand.Read(long)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected error building random data: %v", err)
|
|
||||||
}
|
|
||||||
for i := range long {
|
|
||||||
long[i] = byte(i)
|
|
||||||
}
|
|
||||||
writer.Write(long)
|
|
||||||
writer.Close()
|
|
||||||
if writer.offset != (fileWriterBufferSize + 1) {
|
|
||||||
t.Fatalf("WriteStream not called when buffer capacity reached")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func BenchmarkFileWriter(b *testing.B) {
|
func BenchmarkFileWriter(b *testing.B) {
|
||||||
b.StopTimer() // not sure how long setup above will take
|
b.StopTimer() // not sure how long setup above will take
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
|
@ -237,14 +201,14 @@ func BenchmarkFileWriter(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkBufferedFileWriter(b *testing.B) {
|
func BenchmarkfileWriter(b *testing.B) {
|
||||||
b.StopTimer() // not sure how long setup above will take
|
b.StopTimer() // not sure how long setup above will take
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
bfw, err := newFileWriter(ctx, inmemory.New(), "/random")
|
bfw, err := newFileWriter(ctx, inmemory.New(), "/random")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
|
b.Fatalf("Failed to initialize fileWriter: %v", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
randomBytes := make([]byte, 1<<20)
|
randomBytes := make([]byte, 1<<20)
|
||||||
|
|
|
@ -270,7 +270,7 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
|
||||||
id: uuid,
|
id: uuid,
|
||||||
startedAt: startedAt,
|
startedAt: startedAt,
|
||||||
digester: digest.Canonical.New(),
|
digester: digest.Canonical.New(),
|
||||||
bufferedFileWriter: *fw,
|
fileWriter: *fw,
|
||||||
resumableDigestEnabled: lbs.resumableDigestEnabled,
|
resumableDigestEnabled: lbs.resumableDigestEnabled,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue