Merge pull request #218 from endophage/DIST-148

registry/storage: buffered wrapper for fileWriter
This commit is contained in:
Stephen Day 2015-03-03 18:08:28 -08:00
commit bfef2046e8
4 changed files with 163 additions and 9 deletions

View file

@ -1,6 +1,7 @@
package storage package storage
import ( import (
"bufio"
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
@ -9,6 +10,10 @@ import (
storagedriver "github.com/docker/distribution/registry/storage/driver" storagedriver "github.com/docker/distribution/registry/storage/driver"
) )
const (
fileWriterBufferSize = 5 << 20
)
// fileWriter implements a remote file writer backed by a storage driver. // fileWriter implements a remote file writer backed by a storage driver.
type fileWriter struct { type fileWriter struct {
driver storagedriver.StorageDriver driver storagedriver.StorageDriver
@ -22,6 +27,11 @@ type fileWriter struct {
err error // terminal error, if set, reader is closed err error // terminal error, if set, reader is closed
} }
type bufferedFileWriter struct {
fileWriter
bw *bufio.Writer
}
// fileWriterInterface makes the desired io compliant interface that the // fileWriterInterface makes the desired io compliant interface that the
// filewriter should implement. // filewriter should implement.
type fileWriterInterface interface { type fileWriterInterface interface {
@ -35,7 +45,7 @@ var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This // newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem. // could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
fw := fileWriter{ fw := fileWriter{
driver: driver, driver: driver,
path: path, path: path,
@ -56,7 +66,42 @@ func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter
fw.size = fi.Size() fw.size = fi.Size()
} }
return &fw, nil buffered := bufferedFileWriter{
fileWriter: fw,
}
buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize)
return &buffered, nil
}
// wraps the fileWriter.Write method to buffer small writes
func (bfw *bufferedFileWriter) Write(p []byte) (int, error) {
return bfw.bw.Write(p)
}
// wraps fileWriter.Close to ensure the buffer is flushed
// before we close the writer.
func (bfw *bufferedFileWriter) Close() (err error) {
if err = bfw.Flush(); err != nil {
return err
}
err = bfw.fileWriter.Close()
return err
}
// wraps fileWriter.Seek to ensure offset is handled
// correctly in respect to pending data in the buffer
func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) {
if err := bfw.Flush(); err != nil {
return 0, err
}
return bfw.fileWriter.Seek(offset, whence)
}
// wraps bufio.Writer.Flush to allow intermediate flushes
// of the bufferedFileWriter
func (bfw *bufferedFileWriter) Flush() error {
return bfw.bw.Flush()
} }
// Write writes the buffer p at the current write offset. // Write writes the buffer p at the current write offset.
@ -108,6 +153,9 @@ func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
} }
// Close closes the fileWriter for writing. // Close closes the fileWriter for writing.
// Calling it once is valid and correct and it will
// return a nil error. Calling it subsequent times will
// detect that fw.err has been set and will return the error.
func (fw *fileWriter) Close() error { func (fw *fileWriter) Close() error {
if fw.err != nil { if fw.err != nil {
return fw.err return fw.err
@ -115,7 +163,7 @@ func (fw *fileWriter) Close() error {
fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
return fw.err return nil
} }
// readFromAt writes to fw from r at the specified offset. If offset is less // readFromAt writes to fw from r at the specified offset. If offset is less

View file

@ -8,6 +8,7 @@ import (
"testing" "testing"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/registry/storage/driver/inmemory" "github.com/docker/distribution/registry/storage/driver/inmemory"
) )
@ -42,6 +43,7 @@ func TestSimpleWrite(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error writing content: %v", err) t.Fatalf("unexpected error writing content: %v", err)
} }
fw.Flush()
if n != len(content) { if n != len(content) {
t.Fatalf("unexpected write length: %d != %d", n, len(content)) t.Fatalf("unexpected write length: %d != %d", n, len(content))
@ -146,3 +148,99 @@ func TestSimpleWrite(t *testing.T) {
t.Fatalf("unable to verify write data") t.Fatalf("unable to verify write data")
} }
} }
func TestBufferedFileWriter(t *testing.T) {
writer, err := newFileWriter(inmemory.New(), "/random")
if err != nil {
t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
}
// write one byte and ensure the offset hasn't been incremented.
// offset will only get incremented when the buffer gets flushed
short := []byte{byte(1)}
writer.Write(short)
if writer.offset > 0 {
t.Fatalf("WriteStream called prematurely")
}
// write enough data to cause the buffer to flush and confirm
// the offset has been incremented
long := make([]byte, fileWriterBufferSize)
_, err = rand.Read(long)
if err != nil {
t.Fatalf("unexpected error building random data: %v", err)
}
for i := range long {
long[i] = byte(i)
}
writer.Write(long)
writer.Close()
if writer.offset != (fileWriterBufferSize + 1) {
t.Fatalf("WriteStream not called when buffer capacity reached")
}
}
func BenchmarkFileWriter(b *testing.B) {
b.StopTimer() // not sure how long setup above will take
for i := 0; i < b.N; i++ {
// Start basic fileWriter initialization
fw := fileWriter{
driver: inmemory.New(),
path: "/random",
}
if fi, err := fw.driver.Stat(fw.path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
default:
b.Fatalf("Failed to initialize fileWriter: %v", err.Error())
}
} else {
if fi.IsDir() {
b.Fatalf("Cannot write to a directory")
}
fw.size = fi.Size()
}
randomBytes := make([]byte, 1<<20)
_, err := rand.Read(randomBytes)
if err != nil {
b.Fatalf("unexpected error building random data: %v", err)
}
// End basic file writer initialization
b.StartTimer()
for j := 0; j < 100; j++ {
fw.Write(randomBytes)
}
b.StopTimer()
}
}
func BenchmarkBufferedFileWriter(b *testing.B) {
b.StopTimer() // not sure how long setup above will take
for i := 0; i < b.N; i++ {
bfw, err := newFileWriter(inmemory.New(), "/random")
if err != nil {
b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error())
}
randomBytes := make([]byte, 1<<20)
_, err = rand.Read(randomBytes)
if err != nil {
b.Fatalf("unexpected error building random data: %v", err)
}
b.StartTimer()
for j := 0; j < 100; j++ {
bfw.Write(randomBytes)
}
b.StopTimer()
}
}

View file

@ -139,10 +139,10 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di
} }
return &layerUploadController{ return &layerUploadController{
layerStore: ls, layerStore: ls,
uuid: uuid, uuid: uuid,
startedAt: startedAt, startedAt: startedAt,
fileWriter: *fw, bufferedFileWriter: *fw,
}, nil }, nil
} }

View file

@ -22,7 +22,9 @@ type layerUploadController struct {
uuid string uuid string
startedAt time.Time startedAt time.Time
fileWriter // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy
// LayerUpload Interface
bufferedFileWriter
} }
var _ distribution.LayerUpload = &layerUploadController{} var _ distribution.LayerUpload = &layerUploadController{}
@ -42,6 +44,12 @@ func (luc *layerUploadController) StartedAt() time.Time {
// format <algorithm>:<hex digest>. // format <algorithm>:<hex digest>.
func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) { func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) {
ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish")
err := luc.bufferedFileWriter.Close()
if err != nil {
return nil, err
}
canonical, err := luc.validateLayer(digest) canonical, err := luc.validateLayer(digest)
if err != nil { if err != nil {
return nil, err return nil, err
@ -103,7 +111,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige
// then only have to fetch the difference. // then only have to fetch the difference.
// Read the file from the backend driver and validate it. // Read the file from the backend driver and validate it.
fr, err := newFileReader(luc.fileWriter.driver, luc.path) fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path)
if err != nil { if err != nil {
return "", err return "", err
} }