diff --git a/registry/storage/filewriter.go b/registry/storage/filewriter.go index cbf03704c..5f22142e1 100644 --- a/registry/storage/filewriter.go +++ b/registry/storage/filewriter.go @@ -1,6 +1,7 @@ package storage import ( + "bufio" "bytes" "fmt" "io" @@ -9,6 +10,10 @@ import ( storagedriver "github.com/docker/distribution/registry/storage/driver" ) +const ( + fileWriterBufferSize = 5 << 20 +) + // fileWriter implements a remote file writer backed by a storage driver. type fileWriter struct { driver storagedriver.StorageDriver @@ -22,6 +27,11 @@ type fileWriter struct { err error // terminal error, if set, reader is closed } +type bufferedFileWriter struct { + fileWriter + bw *bufio.Writer +} + // fileWriterInterface makes the desired io compliant interface that the // filewriter should implement. type fileWriterInterface interface { @@ -35,7 +45,7 @@ var _ fileWriterInterface = &fileWriter{} // newFileWriter returns a prepared fileWriter for the driver and path. This // could be considered similar to an "open" call on a regular filesystem. -func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter, error) { +func newFileWriter(driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) { fw := fileWriter{ driver: driver, path: path, @@ -56,7 +66,42 @@ func newFileWriter(driver storagedriver.StorageDriver, path string) (*fileWriter fw.size = fi.Size() } - return &fw, nil + buffered := bufferedFileWriter{ + fileWriter: fw, + } + buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize) + + return &buffered, nil +} + +// wraps the fileWriter.Write method to buffer small writes +func (bfw *bufferedFileWriter) Write(p []byte) (int, error) { + return bfw.bw.Write(p) +} + +// wraps fileWriter.Close to ensure the buffer is flushed +// before we close the writer. +func (bfw *bufferedFileWriter) Close() (err error) { + if err = bfw.Flush(); err != nil { + return err + } + err = bfw.fileWriter.Close() + return err +} + +// wraps fileWriter.Seek to ensure offset is handled +// correctly in respect to pending data in the buffer +func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) { + if err := bfw.Flush(); err != nil { + return 0, err + } + return bfw.fileWriter.Seek(offset, whence) +} + +// wraps bufio.Writer.Flush to allow intermediate flushes +// of the bufferedFileWriter +func (bfw *bufferedFileWriter) Flush() error { + return bfw.bw.Flush() } // Write writes the buffer p at the current write offset. @@ -108,6 +153,9 @@ func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) { } // Close closes the fileWriter for writing. +// Calling it once is valid and correct and it will +// return a nil error. Calling it subsequent times will +// detect that fw.err has been set and will return the error. func (fw *fileWriter) Close() error { if fw.err != nil { return fw.err @@ -115,7 +163,7 @@ func (fw *fileWriter) Close() error { fw.err = fmt.Errorf("filewriter@%v: closed", fw.path) - return fw.err + return nil } // readFromAt writes to fw from r at the specified offset. If offset is less diff --git a/registry/storage/filewriter_test.go b/registry/storage/filewriter_test.go index 1a38a5193..06db31f30 100644 --- a/registry/storage/filewriter_test.go +++ b/registry/storage/filewriter_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/docker/distribution/digest" + storagedriver "github.com/docker/distribution/registry/storage/driver" "github.com/docker/distribution/registry/storage/driver/inmemory" ) @@ -42,6 +43,7 @@ func TestSimpleWrite(t *testing.T) { if err != nil { t.Fatalf("unexpected error writing content: %v", err) } + fw.Flush() if n != len(content) { t.Fatalf("unexpected write length: %d != %d", n, len(content)) @@ -146,3 +148,99 @@ func TestSimpleWrite(t *testing.T) { t.Fatalf("unable to verify write data") } } + +func TestBufferedFileWriter(t *testing.T) { + writer, err := newFileWriter(inmemory.New(), "/random") + + if err != nil { + t.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) + } + + // write one byte and ensure the offset hasn't been incremented. + // offset will only get incremented when the buffer gets flushed + short := []byte{byte(1)} + + writer.Write(short) + + if writer.offset > 0 { + t.Fatalf("WriteStream called prematurely") + } + + // write enough data to cause the buffer to flush and confirm + // the offset has been incremented + long := make([]byte, fileWriterBufferSize) + _, err = rand.Read(long) + if err != nil { + t.Fatalf("unexpected error building random data: %v", err) + } + for i := range long { + long[i] = byte(i) + } + writer.Write(long) + writer.Close() + if writer.offset != (fileWriterBufferSize + 1) { + t.Fatalf("WriteStream not called when buffer capacity reached") + } +} + +func BenchmarkFileWriter(b *testing.B) { + b.StopTimer() // not sure how long setup above will take + for i := 0; i < b.N; i++ { + // Start basic fileWriter initialization + fw := fileWriter{ + driver: inmemory.New(), + path: "/random", + } + + if fi, err := fw.driver.Stat(fw.path); err != nil { + switch err := err.(type) { + case storagedriver.PathNotFoundError: + // ignore, offset is zero + default: + b.Fatalf("Failed to initialize fileWriter: %v", err.Error()) + } + } else { + if fi.IsDir() { + b.Fatalf("Cannot write to a directory") + } + + fw.size = fi.Size() + } + + randomBytes := make([]byte, 1<<20) + _, err := rand.Read(randomBytes) + if err != nil { + b.Fatalf("unexpected error building random data: %v", err) + } + // End basic file writer initialization + + b.StartTimer() + for j := 0; j < 100; j++ { + fw.Write(randomBytes) + } + b.StopTimer() + } +} + +func BenchmarkBufferedFileWriter(b *testing.B) { + b.StopTimer() // not sure how long setup above will take + for i := 0; i < b.N; i++ { + bfw, err := newFileWriter(inmemory.New(), "/random") + + if err != nil { + b.Fatalf("Failed to initialize bufferedFileWriter: %v", err.Error()) + } + + randomBytes := make([]byte, 1<<20) + _, err = rand.Read(randomBytes) + if err != nil { + b.Fatalf("unexpected error building random data: %v", err) + } + + b.StartTimer() + for j := 0; j < 100; j++ { + bfw.Write(randomBytes) + } + b.StopTimer() + } +} diff --git a/registry/storage/layerstore.go b/registry/storage/layerstore.go index 153e42a89..f546529ec 100644 --- a/registry/storage/layerstore.go +++ b/registry/storage/layerstore.go @@ -139,10 +139,10 @@ func (ls *layerStore) newLayerUpload(uuid, path string, startedAt time.Time) (di } return &layerUploadController{ - layerStore: ls, - uuid: uuid, - startedAt: startedAt, - fileWriter: *fw, + layerStore: ls, + uuid: uuid, + startedAt: startedAt, + bufferedFileWriter: *fw, }, nil } diff --git a/registry/storage/layerupload.go b/registry/storage/layerupload.go index 369a9bd5e..14e423388 100644 --- a/registry/storage/layerupload.go +++ b/registry/storage/layerupload.go @@ -22,7 +22,9 @@ type layerUploadController struct { uuid string startedAt time.Time - fileWriter + // implementes io.WriteSeeker, io.ReaderFrom and io.Closer to satisy + // LayerUpload Interface + bufferedFileWriter } var _ distribution.LayerUpload = &layerUploadController{} @@ -42,6 +44,12 @@ func (luc *layerUploadController) StartedAt() time.Time { // format :. func (luc *layerUploadController) Finish(digest digest.Digest) (distribution.Layer, error) { ctxu.GetLogger(luc.layerStore.repository.ctx).Debug("(*layerUploadController).Finish") + + err := luc.bufferedFileWriter.Close() + if err != nil { + return nil, err + } + canonical, err := luc.validateLayer(digest) if err != nil { return nil, err @@ -103,7 +111,7 @@ func (luc *layerUploadController) validateLayer(dgst digest.Digest) (digest.Dige // then only have to fetch the difference. // Read the file from the backend driver and validate it. - fr, err := newFileReader(luc.fileWriter.driver, luc.path) + fr, err := newFileReader(luc.bufferedFileWriter.driver, luc.path) if err != nil { return "", err }