distribution/registry/storage/filewriter.go
Richard ae216e365a Make Storage Driver API calls context aware.
- Change driver interface to take a context as its first argument
     - Make newFileReader take a context as its first argument
     - Make newFileWriter take a context as its first argument
     - Make blobstore exists and delete take a context as a first argument
     - Pass the layerreader's context to the storage layer
     - Pass the app's context to purgeuploads
     - Store the app's context into the blobstore (was previously null)
     - Pass the trace'd context to the storage drivers

Signed-off-by: Richard Scothern <richard.scothern@gmail.com>
2015-04-27 15:58:58 -07:00

202 lines
4.9 KiB
Go

package storage
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
)
const (
fileWriterBufferSize = 5 << 20
)
// fileWriter implements a remote file writer backed by a storage driver.
type fileWriter struct {
driver storagedriver.StorageDriver
ctx context.Context
// identifying fields
path string
// mutable fields
size int64 // size of the file, aka the current end
offset int64 // offset is the current write offset
err error // terminal error, if set, reader is closed
}
type bufferedFileWriter struct {
fileWriter
bw *bufio.Writer
}
// fileWriterInterface makes the desired io compliant interface that the
// filewriter should implement.
type fileWriterInterface interface {
io.WriteSeeker
io.WriterAt
io.ReaderFrom
io.Closer
}
var _ fileWriterInterface = &fileWriter{}
// newFileWriter returns a prepared fileWriter for the driver and path. This
// could be considered similar to an "open" call on a regular filesystem.
func newFileWriter(ctx context.Context, driver storagedriver.StorageDriver, path string) (*bufferedFileWriter, error) {
fw := fileWriter{
driver: driver,
path: path,
ctx: ctx,
}
if fi, err := driver.Stat(ctx, path); err != nil {
switch err := err.(type) {
case storagedriver.PathNotFoundError:
// ignore, offset is zero
default:
return nil, err
}
} else {
if fi.IsDir() {
return nil, fmt.Errorf("cannot write to a directory")
}
fw.size = fi.Size()
}
buffered := bufferedFileWriter{
fileWriter: fw,
}
buffered.bw = bufio.NewWriterSize(&buffered.fileWriter, fileWriterBufferSize)
return &buffered, nil
}
// wraps the fileWriter.Write method to buffer small writes
func (bfw *bufferedFileWriter) Write(p []byte) (int, error) {
return bfw.bw.Write(p)
}
// wraps fileWriter.Close to ensure the buffer is flushed
// before we close the writer.
func (bfw *bufferedFileWriter) Close() (err error) {
if err = bfw.Flush(); err != nil {
return err
}
err = bfw.fileWriter.Close()
return err
}
// wraps fileWriter.Seek to ensure offset is handled
// correctly in respect to pending data in the buffer
func (bfw *bufferedFileWriter) Seek(offset int64, whence int) (int64, error) {
if err := bfw.Flush(); err != nil {
return 0, err
}
return bfw.fileWriter.Seek(offset, whence)
}
// wraps bufio.Writer.Flush to allow intermediate flushes
// of the bufferedFileWriter
func (bfw *bufferedFileWriter) Flush() error {
return bfw.bw.Flush()
}
// Write writes the buffer p at the current write offset.
func (fw *fileWriter) Write(p []byte) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), -1)
return int(nn), err
}
// WriteAt writes p at the specified offset. The underlying offset does not
// change.
func (fw *fileWriter) WriteAt(p []byte, offset int64) (n int, err error) {
nn, err := fw.readFromAt(bytes.NewReader(p), offset)
return int(nn), err
}
// ReadFrom reads reader r until io.EOF writing the contents at the current
// offset.
func (fw *fileWriter) ReadFrom(r io.Reader) (n int64, err error) {
return fw.readFromAt(r, -1)
}
// Seek moves the write position do the requested offest based on the whence
// argument, which can be os.SEEK_CUR, os.SEEK_END, or os.SEEK_SET.
func (fw *fileWriter) Seek(offset int64, whence int) (int64, error) {
if fw.err != nil {
return 0, fw.err
}
var err error
newOffset := fw.offset
switch whence {
case os.SEEK_CUR:
newOffset += int64(offset)
case os.SEEK_END:
newOffset = fw.size + int64(offset)
case os.SEEK_SET:
newOffset = int64(offset)
}
if newOffset < 0 {
err = fmt.Errorf("cannot seek to negative position")
} else {
// No problems, set the offset.
fw.offset = newOffset
}
return fw.offset, err
}
// Close closes the fileWriter for writing.
// Calling it once is valid and correct and it will
// return a nil error. Calling it subsequent times will
// detect that fw.err has been set and will return the error.
func (fw *fileWriter) Close() error {
if fw.err != nil {
return fw.err
}
fw.err = fmt.Errorf("filewriter@%v: closed", fw.path)
return nil
}
// readFromAt writes to fw from r at the specified offset. If offset is less
// than zero, the value of fw.offset is used and updated after the operation.
func (fw *fileWriter) readFromAt(r io.Reader, offset int64) (n int64, err error) {
if fw.err != nil {
return 0, fw.err
}
var updateOffset bool
if offset < 0 {
offset = fw.offset
updateOffset = true
}
nn, err := fw.driver.WriteStream(fw.ctx, fw.path, offset, r)
if updateOffset {
// We should forward the offset, whether or not there was an error.
// Basically, we keep the filewriter in sync with the reader's head. If an
// error is encountered, the whole thing should be retried but we proceed
// from an expected offset, even if the data didn't make it to the
// backend.
fw.offset += nn
if fw.offset > fw.size {
fw.size = fw.offset
}
}
return nn, err
}