From a9bf7a2aaec88cfeb69a92de8c7d5f212cf30c4d Mon Sep 17 00:00:00 2001 From: Li Yi Date: Sun, 21 Feb 2016 08:54:32 +0800 Subject: [PATCH] Support FileWriter interface for OSS storage driver Change-Id: Ie5533ad85f944800499ca1040fd67bf1378815e0 Signed-off-by: Li Yi --- cmd/registry/main.go | 2 +- registry/storage/driver/oss/oss.go | 523 +++++++++++------------------ 2 files changed, 203 insertions(+), 322 deletions(-) diff --git a/cmd/registry/main.go b/cmd/registry/main.go index 49fd7ba5c..5f1810031 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -13,7 +13,7 @@ import ( _ "github.com/docker/distribution/registry/storage/driver/gcs" _ "github.com/docker/distribution/registry/storage/driver/inmemory" _ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront" - // _ "github.com/docker/distribution/registry/storage/driver/oss" + _ "github.com/docker/distribution/registry/storage/driver/oss" _ "github.com/docker/distribution/registry/storage/driver/s3-aws" _ "github.com/docker/distribution/registry/storage/driver/s3-goamz" // _ "github.com/docker/distribution/registry/storage/driver/swift" diff --git a/registry/storage/driver/oss/oss.go b/registry/storage/driver/oss/oss.go index 1ec045252..7ae703346 100644 --- a/registry/storage/driver/oss/oss.go +++ b/registry/storage/driver/oss/oss.go @@ -20,7 +20,6 @@ import ( "reflect" "strconv" "strings" - "sync" "time" "github.com/docker/distribution/context" @@ -75,9 +74,6 @@ type driver struct { ChunkSize int64 Encrypt bool RootDirectory string - - pool sync.Pool // pool []byte buffers used for WriteStream - zeros []byte // shared, zero-valued buffer used for WriteStream } type baseEmbed struct { @@ -99,8 +95,7 @@ type Driver struct { // - encrypt func FromParameters(parameters map[string]interface{}) (*Driver, error) { // Providing no values for these is valid in case the user is authenticating - // with an IAM on an ec2 instance (in which case the instance credentials will - // be summoned when GetAuth is called) + accessKey, ok := parameters["accesskeyid"] if !ok { return nil, fmt.Errorf("No accesskeyid parameter provided") @@ -220,11 +215,6 @@ func New(params DriverParameters) (*Driver, error) { ChunkSize: params.ChunkSize, Encrypt: params.Encrypt, RootDirectory: params.RootDirectory, - zeros: make([]byte, params.ChunkSize), - } - - d.pool.New = func() interface{} { - return make([]byte, d.ChunkSize) } return &Driver{ @@ -256,9 +246,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions())) } -// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a +// Reader retrieves an io.ReadCloser for the content stored at "path" with a // given byte offset. -func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { +func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { headers := make(http.Header) headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-") @@ -279,315 +269,37 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io. return resp.Body, nil } -// WriteStream stores the contents of the provided io.Reader at a -// location designated by the given path. The driver will know it has -// received the full contents when the reader returns io.EOF. The number -// of successfully READ bytes will be returned, even if an error is -// returned. May be used to resume writing a stream by providing a nonzero -// offset. Offsets past the current size will write from the position -// beyond the end of the file. -func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) { - partNumber := 1 - bytesRead := 0 - var putErrChan chan error - parts := []oss.Part{} - var part oss.Part - done := make(chan struct{}) // stopgap to free up waiting goroutines - - multi, err := d.Bucket.InitMulti(d.ossPath(path), d.getContentType(), getPermissions(), d.getOptions()) +// Writer returns a FileWriter which will store the content written to it +// at the location designated by "path" after the call to Commit. +func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) { + key := d.ossPath(path) + if !append { + // TODO (brianbland): cancel other uploads at this path + multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions()) + if err != nil { + return nil, err + } + return d.newWriter(key, multi, nil), nil + } + multis, _, err := d.Bucket.ListMulti(key, "") if err != nil { - return 0, err + return nil, parseError(path, err) } - - buf := d.getbuf() - - // We never want to leave a dangling multipart upload, our only consistent state is - // when there is a whole object at path. This is in order to remain consistent with - // the stat call. - // - // Note that if the machine dies before executing the defer, we will be left with a dangling - // multipart upload, which will eventually be cleaned up, but we will lose all of the progress - // made prior to the machine crashing. - defer func() { - if putErrChan != nil { - if putErr := <-putErrChan; putErr != nil { - err = putErr - } + for _, multi := range multis { + if key != multi.Key { + continue } - - if len(parts) > 0 { - if multi == nil { - // Parts should be empty if the multi is not initialized - panic("Unreachable") - } else { - if multi.Complete(parts) != nil { - multi.Abort() - } - } - } - - d.putbuf(buf) // needs to be here to pick up new buf value - close(done) // free up any waiting goroutines - }() - - // Fills from 0 to total from current - fromSmallCurrent := func(total int64) error { - current, err := d.ReadStream(ctx, path, 0) + parts, err := multi.ListParts() if err != nil { - return err + return nil, parseError(path, err) } - - bytesRead = 0 - for int64(bytesRead) < total { - //The loop should very rarely enter a second iteration - nn, err := current.Read(buf[bytesRead:total]) - bytesRead += nn - if err != nil { - if err != io.EOF { - return err - } - - break - } - + var multiSize int64 + for _, part := range parts { + multiSize += part.Size } - return nil + return d.newWriter(key, multi, parts), nil } - - // Fills from parameter to chunkSize from reader - fromReader := func(from int64) error { - bytesRead = 0 - for from+int64(bytesRead) < d.ChunkSize { - nn, err := reader.Read(buf[from+int64(bytesRead):]) - totalRead += int64(nn) - bytesRead += nn - - if err != nil { - if err != io.EOF { - return err - } - - break - } - } - - if putErrChan == nil { - putErrChan = make(chan error) - } else { - if putErr := <-putErrChan; putErr != nil { - putErrChan = nil - return putErr - } - } - - go func(bytesRead int, from int64, buf []byte) { - defer d.putbuf(buf) // this buffer gets dropped after this call - - // DRAGONS(stevvooe): There are few things one might want to know - // about this section. First, the putErrChan is expecting an error - // and a nil or just a nil to come through the channel. This is - // covered by the silly defer below. The other aspect is the OSS - // retry backoff to deal with RequestTimeout errors. Even though - // the underlying OSS library should handle it, it doesn't seem to - // be part of the shouldRetry function (see denverdino/aliyungo/oss). - defer func() { - select { - case putErrChan <- nil: // for some reason, we do this no matter what. - case <-done: - return // ensure we don't leak the goroutine - } - }() - - if bytesRead <= 0 { - return - } - - var err error - var part oss.Part - - part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]), defaultTimeout) - - if err != nil { - logrus.Errorf("error putting part, aborting: %v", err) - select { - case putErrChan <- err: - case <-done: - return // don't leak the goroutine - } - } - - // parts and partNumber are safe, because this function is the - // only one modifying them and we force it to be executed - // serially. - parts = append(parts, part) - partNumber++ - }(bytesRead, from, buf) - - buf = d.getbuf() // use a new buffer for the next call - return nil - } - - if offset > 0 { - resp, err := d.Bucket.Head(d.ossPath(path), nil) - if err != nil { - if ossErr, ok := err.(*oss.Error); !ok || ossErr.StatusCode != http.StatusNotFound { - return 0, err - } - } - - currentLength := int64(0) - if err == nil { - currentLength = resp.ContentLength - } - - if currentLength >= offset { - if offset < d.ChunkSize { - // chunkSize > currentLength >= offset - if err = fromSmallCurrent(offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // currentLength >= offset >= chunkSize - _, part, err = multi.PutPartCopy(partNumber, - oss.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)}, - d.Bucket.Path(d.ossPath(path))) - if err != nil { - return 0, err - } - - parts = append(parts, part) - partNumber++ - } - } else { - // Fills between parameters with 0s but only when to - from <= chunkSize - fromZeroFillSmall := func(from, to int64) error { - bytesRead = 0 - for from+int64(bytesRead) < to { - nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to]) - bytesRead += nn - if err != nil { - return err - } - } - - return nil - } - - // Fills between parameters with 0s, making new parts - fromZeroFillLarge := func(from, to int64) error { - bytesRead64 := int64(0) - for to-(from+bytesRead64) >= d.ChunkSize { - part, err := multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(d.zeros), defaultTimeout) - if err != nil { - return err - } - bytesRead64 += d.ChunkSize - - parts = append(parts, part) - partNumber++ - } - - return fromZeroFillSmall(0, (to-from)%d.ChunkSize) - } - - // currentLength < offset - if currentLength < d.ChunkSize { - if offset < d.ChunkSize { - // chunkSize > offset > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset); err != nil { - return totalRead, err - } - - if totalRead+offset < d.ChunkSize { - return totalRead, nil - } - } else { - // offset >= chunkSize > currentLength - if err = fromSmallCurrent(currentLength); err != nil { - return totalRead, err - } - - if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil { - return totalRead, err - } - - part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf), defaultTimeout) - if err != nil { - return totalRead, err - } - - parts = append(parts, part) - partNumber++ - - //Zero fill from chunkSize up to offset, then some reader - if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil { - return totalRead, err - } - - if err = fromReader(offset % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+(offset%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - } else { - // offset > currentLength >= chunkSize - _, part, err = multi.PutPartCopy(partNumber, - oss.CopyOptions{}, - d.Bucket.Path(d.ossPath(path))) - if err != nil { - return 0, err - } - - parts = append(parts, part) - partNumber++ - - //Zero fill from currentLength up to offset, then some reader - if err = fromZeroFillLarge(currentLength, offset); err != nil { - return totalRead, err - } - - if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil { - return totalRead, err - } - - if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize { - return totalRead, nil - } - } - - } - } - - for { - if err = fromReader(0); err != nil { - return totalRead, err - } - - if int64(bytesRead) < d.ChunkSize { - break - } - } - - return totalRead, nil + return nil, storagedriver.PathNotFoundError{Path: path} } // Stat retrieves the FileInfo for the given path, including the current size @@ -778,12 +490,181 @@ func (d *driver) getContentType() string { return "application/octet-stream" } -// getbuf returns a buffer from the driver's pool with length d.ChunkSize. -func (d *driver) getbuf() []byte { - return d.pool.Get().([]byte) +// writer attempts to upload parts to S3 in a buffered fashion where the last +// part is at least as large as the chunksize, so the multipart upload could be +// cleanly resumed in the future. This is violated if Close is called after less +// than a full chunk is written. +type writer struct { + driver *driver + key string + multi *oss.Multi + parts []oss.Part + size int64 + readyPart []byte + pendingPart []byte + closed bool + committed bool + cancelled bool } -func (d *driver) putbuf(p []byte) { - copy(p, d.zeros) - d.pool.Put(p) +func (d *driver) newWriter(key string, multi *oss.Multi, parts []oss.Part) storagedriver.FileWriter { + var size int64 + for _, part := range parts { + size += part.Size + } + return &writer{ + driver: d, + key: key, + multi: multi, + parts: parts, + size: size, + } +} + +func (w *writer) Write(p []byte) (int, error) { + if w.closed { + return 0, fmt.Errorf("already closed") + } else if w.committed { + return 0, fmt.Errorf("already committed") + } else if w.cancelled { + return 0, fmt.Errorf("already cancelled") + } + + // If the last written part is smaller than minChunkSize, we need to make a + // new multipart upload :sadface: + if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize { + err := w.multi.Complete(w.parts) + if err != nil { + w.multi.Abort() + return 0, err + } + + multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions()) + if err != nil { + return 0, err + } + w.multi = multi + + // If the entire written file is smaller than minChunkSize, we need to make + // a new part from scratch :double sad face: + if w.size < minChunkSize { + contents, err := w.driver.Bucket.Get(w.key) + if err != nil { + return 0, err + } + w.parts = nil + w.readyPart = contents + } else { + // Otherwise we can use the old file as the new first part + _, part, err := multi.PutPartCopy(1, oss.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key) + if err != nil { + return 0, err + } + w.parts = []oss.Part{part} + } + } + + var n int + + for len(p) > 0 { + // If no parts are ready to write, fill up the first part + if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.readyPart = append(w.readyPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + } else { + w.readyPart = append(w.readyPart, p...) + n += len(p) + p = nil + } + } + + if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { + if len(p) >= neededBytes { + w.pendingPart = append(w.pendingPart, p[:neededBytes]...) + n += neededBytes + p = p[neededBytes:] + err := w.flushPart() + if err != nil { + w.size += int64(n) + return n, err + } + } else { + w.pendingPart = append(w.pendingPart, p...) + n += len(p) + p = nil + } + } + } + w.size += int64(n) + return n, nil +} + +func (w *writer) Size() int64 { + return w.size +} + +func (w *writer) Close() error { + if w.closed { + return fmt.Errorf("already closed") + } + w.closed = true + return w.flushPart() +} + +func (w *writer) Cancel() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } + w.cancelled = true + err := w.multi.Abort() + return err +} + +func (w *writer) Commit() error { + if w.closed { + return fmt.Errorf("already closed") + } else if w.committed { + return fmt.Errorf("already committed") + } else if w.cancelled { + return fmt.Errorf("already cancelled") + } + err := w.flushPart() + if err != nil { + return err + } + w.committed = true + err = w.multi.Complete(w.parts) + if err != nil { + w.multi.Abort() + return err + } + return nil +} + +// flushPart flushes buffers to write a part to S3. +// Only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flushPart() error { + if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { + // nothing to write + return nil + } + if len(w.pendingPart) < int(w.driver.ChunkSize) { + // closing with a small pending part + // combine ready and pending to avoid writing a small part + w.readyPart = append(w.readyPart, w.pendingPart...) + w.pendingPart = nil + } + + part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart)) + if err != nil { + return err + } + w.parts = append(w.parts, part) + w.readyPart = w.pendingPart + w.pendingPart = nil + return nil }