forked from TrueCloudLab/distribution
Support FileWriter interface for OSS storage driver
Change-Id: Ie5533ad85f944800499ca1040fd67bf1378815e0 Signed-off-by: Li Yi <denverdino@gmail.com>
This commit is contained in:
parent
307504713f
commit
a9bf7a2aae
2 changed files with 203 additions and 322 deletions
|
@ -13,7 +13,7 @@ import (
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/gcs"
|
_ "github.com/docker/distribution/registry/storage/driver/gcs"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
_ "github.com/docker/distribution/registry/storage/driver/inmemory"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
_ "github.com/docker/distribution/registry/storage/driver/middleware/cloudfront"
|
||||||
// _ "github.com/docker/distribution/registry/storage/driver/oss"
|
_ "github.com/docker/distribution/registry/storage/driver/oss"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
|
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
|
||||||
_ "github.com/docker/distribution/registry/storage/driver/s3-goamz"
|
_ "github.com/docker/distribution/registry/storage/driver/s3-goamz"
|
||||||
// _ "github.com/docker/distribution/registry/storage/driver/swift"
|
// _ "github.com/docker/distribution/registry/storage/driver/swift"
|
||||||
|
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/distribution/context"
|
"github.com/docker/distribution/context"
|
||||||
|
@ -75,9 +74,6 @@ type driver struct {
|
||||||
ChunkSize int64
|
ChunkSize int64
|
||||||
Encrypt bool
|
Encrypt bool
|
||||||
RootDirectory string
|
RootDirectory string
|
||||||
|
|
||||||
pool sync.Pool // pool []byte buffers used for WriteStream
|
|
||||||
zeros []byte // shared, zero-valued buffer used for WriteStream
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type baseEmbed struct {
|
type baseEmbed struct {
|
||||||
|
@ -99,8 +95,7 @@ type Driver struct {
|
||||||
// - encrypt
|
// - encrypt
|
||||||
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||||
// Providing no values for these is valid in case the user is authenticating
|
// Providing no values for these is valid in case the user is authenticating
|
||||||
// with an IAM on an ec2 instance (in which case the instance credentials will
|
|
||||||
// be summoned when GetAuth is called)
|
|
||||||
accessKey, ok := parameters["accesskeyid"]
|
accessKey, ok := parameters["accesskeyid"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("No accesskeyid parameter provided")
|
return nil, fmt.Errorf("No accesskeyid parameter provided")
|
||||||
|
@ -220,11 +215,6 @@ func New(params DriverParameters) (*Driver, error) {
|
||||||
ChunkSize: params.ChunkSize,
|
ChunkSize: params.ChunkSize,
|
||||||
Encrypt: params.Encrypt,
|
Encrypt: params.Encrypt,
|
||||||
RootDirectory: params.RootDirectory,
|
RootDirectory: params.RootDirectory,
|
||||||
zeros: make([]byte, params.ChunkSize),
|
|
||||||
}
|
|
||||||
|
|
||||||
d.pool.New = func() interface{} {
|
|
||||||
return make([]byte, d.ChunkSize)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Driver{
|
return &Driver{
|
||||||
|
@ -256,9 +246,9 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
|
||||||
return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
|
return parseError(path, d.Bucket.Put(d.ossPath(path), contents, d.getContentType(), getPermissions(), d.getOptions()))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadStream retrieves an io.ReadCloser for the content stored at "path" with a
|
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
||||||
// given byte offset.
|
// given byte offset.
|
||||||
func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||||
headers := make(http.Header)
|
headers := make(http.Header)
|
||||||
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
|
headers.Add("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
|
||||||
|
|
||||||
|
@ -279,315 +269,37 @@ func (d *driver) ReadStream(ctx context.Context, path string, offset int64) (io.
|
||||||
return resp.Body, nil
|
return resp.Body, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteStream stores the contents of the provided io.Reader at a
|
// Writer returns a FileWriter which will store the content written to it
|
||||||
// location designated by the given path. The driver will know it has
|
// at the location designated by "path" after the call to Commit.
|
||||||
// received the full contents when the reader returns io.EOF. The number
|
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||||
// of successfully READ bytes will be returned, even if an error is
|
key := d.ossPath(path)
|
||||||
// returned. May be used to resume writing a stream by providing a nonzero
|
if !append {
|
||||||
// offset. Offsets past the current size will write from the position
|
// TODO (brianbland): cancel other uploads at this path
|
||||||
// beyond the end of the file.
|
multi, err := d.Bucket.InitMulti(key, d.getContentType(), getPermissions(), d.getOptions())
|
||||||
func (d *driver) WriteStream(ctx context.Context, path string, offset int64, reader io.Reader) (totalRead int64, err error) {
|
|
||||||
partNumber := 1
|
|
||||||
bytesRead := 0
|
|
||||||
var putErrChan chan error
|
|
||||||
parts := []oss.Part{}
|
|
||||||
var part oss.Part
|
|
||||||
done := make(chan struct{}) // stopgap to free up waiting goroutines
|
|
||||||
|
|
||||||
multi, err := d.Bucket.InitMulti(d.ossPath(path), d.getContentType(), getPermissions(), d.getOptions())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return d.newWriter(key, multi, nil), nil
|
||||||
buf := d.getbuf()
|
|
||||||
|
|
||||||
// We never want to leave a dangling multipart upload, our only consistent state is
|
|
||||||
// when there is a whole object at path. This is in order to remain consistent with
|
|
||||||
// the stat call.
|
|
||||||
//
|
|
||||||
// Note that if the machine dies before executing the defer, we will be left with a dangling
|
|
||||||
// multipart upload, which will eventually be cleaned up, but we will lose all of the progress
|
|
||||||
// made prior to the machine crashing.
|
|
||||||
defer func() {
|
|
||||||
if putErrChan != nil {
|
|
||||||
if putErr := <-putErrChan; putErr != nil {
|
|
||||||
err = putErr
|
|
||||||
}
|
}
|
||||||
}
|
multis, _, err := d.Bucket.ListMulti(key, "")
|
||||||
|
|
||||||
if len(parts) > 0 {
|
|
||||||
if multi == nil {
|
|
||||||
// Parts should be empty if the multi is not initialized
|
|
||||||
panic("Unreachable")
|
|
||||||
} else {
|
|
||||||
if multi.Complete(parts) != nil {
|
|
||||||
multi.Abort()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
d.putbuf(buf) // needs to be here to pick up new buf value
|
|
||||||
close(done) // free up any waiting goroutines
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Fills from 0 to total from current
|
|
||||||
fromSmallCurrent := func(total int64) error {
|
|
||||||
current, err := d.ReadStream(ctx, path, 0)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, parseError(path, err)
|
||||||
}
|
}
|
||||||
|
for _, multi := range multis {
|
||||||
bytesRead = 0
|
if key != multi.Key {
|
||||||
for int64(bytesRead) < total {
|
continue
|
||||||
//The loop should very rarely enter a second iteration
|
}
|
||||||
nn, err := current.Read(buf[bytesRead:total])
|
parts, err := multi.ListParts()
|
||||||
bytesRead += nn
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != io.EOF {
|
return nil, parseError(path, err)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
var multiSize int64
|
||||||
break
|
for _, part := range parts {
|
||||||
|
multiSize += part.Size
|
||||||
}
|
}
|
||||||
|
return d.newWriter(key, multi, parts), nil
|
||||||
}
|
}
|
||||||
return nil
|
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||||
}
|
|
||||||
|
|
||||||
// Fills from parameter to chunkSize from reader
|
|
||||||
fromReader := func(from int64) error {
|
|
||||||
bytesRead = 0
|
|
||||||
for from+int64(bytesRead) < d.ChunkSize {
|
|
||||||
nn, err := reader.Read(buf[from+int64(bytesRead):])
|
|
||||||
totalRead += int64(nn)
|
|
||||||
bytesRead += nn
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if err != io.EOF {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if putErrChan == nil {
|
|
||||||
putErrChan = make(chan error)
|
|
||||||
} else {
|
|
||||||
if putErr := <-putErrChan; putErr != nil {
|
|
||||||
putErrChan = nil
|
|
||||||
return putErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(bytesRead int, from int64, buf []byte) {
|
|
||||||
defer d.putbuf(buf) // this buffer gets dropped after this call
|
|
||||||
|
|
||||||
// DRAGONS(stevvooe): There are few things one might want to know
|
|
||||||
// about this section. First, the putErrChan is expecting an error
|
|
||||||
// and a nil or just a nil to come through the channel. This is
|
|
||||||
// covered by the silly defer below. The other aspect is the OSS
|
|
||||||
// retry backoff to deal with RequestTimeout errors. Even though
|
|
||||||
// the underlying OSS library should handle it, it doesn't seem to
|
|
||||||
// be part of the shouldRetry function (see denverdino/aliyungo/oss).
|
|
||||||
defer func() {
|
|
||||||
select {
|
|
||||||
case putErrChan <- nil: // for some reason, we do this no matter what.
|
|
||||||
case <-done:
|
|
||||||
return // ensure we don't leak the goroutine
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if bytesRead <= 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var part oss.Part
|
|
||||||
|
|
||||||
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf[0:int64(bytesRead)+from]), defaultTimeout)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("error putting part, aborting: %v", err)
|
|
||||||
select {
|
|
||||||
case putErrChan <- err:
|
|
||||||
case <-done:
|
|
||||||
return // don't leak the goroutine
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// parts and partNumber are safe, because this function is the
|
|
||||||
// only one modifying them and we force it to be executed
|
|
||||||
// serially.
|
|
||||||
parts = append(parts, part)
|
|
||||||
partNumber++
|
|
||||||
}(bytesRead, from, buf)
|
|
||||||
|
|
||||||
buf = d.getbuf() // use a new buffer for the next call
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if offset > 0 {
|
|
||||||
resp, err := d.Bucket.Head(d.ossPath(path), nil)
|
|
||||||
if err != nil {
|
|
||||||
if ossErr, ok := err.(*oss.Error); !ok || ossErr.StatusCode != http.StatusNotFound {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
currentLength := int64(0)
|
|
||||||
if err == nil {
|
|
||||||
currentLength = resp.ContentLength
|
|
||||||
}
|
|
||||||
|
|
||||||
if currentLength >= offset {
|
|
||||||
if offset < d.ChunkSize {
|
|
||||||
// chunkSize > currentLength >= offset
|
|
||||||
if err = fromSmallCurrent(offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromReader(offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if totalRead+offset < d.ChunkSize {
|
|
||||||
return totalRead, nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// currentLength >= offset >= chunkSize
|
|
||||||
_, part, err = multi.PutPartCopy(partNumber,
|
|
||||||
oss.CopyOptions{CopySourceOptions: "bytes=0-" + strconv.FormatInt(offset-1, 10)},
|
|
||||||
d.Bucket.Path(d.ossPath(path)))
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
parts = append(parts, part)
|
|
||||||
partNumber++
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Fills between parameters with 0s but only when to - from <= chunkSize
|
|
||||||
fromZeroFillSmall := func(from, to int64) error {
|
|
||||||
bytesRead = 0
|
|
||||||
for from+int64(bytesRead) < to {
|
|
||||||
nn, err := bytes.NewReader(d.zeros).Read(buf[from+int64(bytesRead) : to])
|
|
||||||
bytesRead += nn
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fills between parameters with 0s, making new parts
|
|
||||||
fromZeroFillLarge := func(from, to int64) error {
|
|
||||||
bytesRead64 := int64(0)
|
|
||||||
for to-(from+bytesRead64) >= d.ChunkSize {
|
|
||||||
part, err := multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(d.zeros), defaultTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bytesRead64 += d.ChunkSize
|
|
||||||
|
|
||||||
parts = append(parts, part)
|
|
||||||
partNumber++
|
|
||||||
}
|
|
||||||
|
|
||||||
return fromZeroFillSmall(0, (to-from)%d.ChunkSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
// currentLength < offset
|
|
||||||
if currentLength < d.ChunkSize {
|
|
||||||
if offset < d.ChunkSize {
|
|
||||||
// chunkSize > offset > currentLength
|
|
||||||
if err = fromSmallCurrent(currentLength); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromZeroFillSmall(currentLength, offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromReader(offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if totalRead+offset < d.ChunkSize {
|
|
||||||
return totalRead, nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// offset >= chunkSize > currentLength
|
|
||||||
if err = fromSmallCurrent(currentLength); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromZeroFillSmall(currentLength, d.ChunkSize); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
part, err = multi.PutPartWithTimeout(int(partNumber), bytes.NewReader(buf), defaultTimeout)
|
|
||||||
if err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
parts = append(parts, part)
|
|
||||||
partNumber++
|
|
||||||
|
|
||||||
//Zero fill from chunkSize up to offset, then some reader
|
|
||||||
if err = fromZeroFillLarge(d.ChunkSize, offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromReader(offset % d.ChunkSize); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if totalRead+(offset%d.ChunkSize) < d.ChunkSize {
|
|
||||||
return totalRead, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// offset > currentLength >= chunkSize
|
|
||||||
_, part, err = multi.PutPartCopy(partNumber,
|
|
||||||
oss.CopyOptions{},
|
|
||||||
d.Bucket.Path(d.ossPath(path)))
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
parts = append(parts, part)
|
|
||||||
partNumber++
|
|
||||||
|
|
||||||
//Zero fill from currentLength up to offset, then some reader
|
|
||||||
if err = fromZeroFillLarge(currentLength, offset); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = fromReader((offset - currentLength) % d.ChunkSize); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if totalRead+((offset-currentLength)%d.ChunkSize) < d.ChunkSize {
|
|
||||||
return totalRead, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err = fromReader(0); err != nil {
|
|
||||||
return totalRead, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if int64(bytesRead) < d.ChunkSize {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return totalRead, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat retrieves the FileInfo for the given path, including the current size
|
// Stat retrieves the FileInfo for the given path, including the current size
|
||||||
|
@ -778,12 +490,181 @@ func (d *driver) getContentType() string {
|
||||||
return "application/octet-stream"
|
return "application/octet-stream"
|
||||||
}
|
}
|
||||||
|
|
||||||
// getbuf returns a buffer from the driver's pool with length d.ChunkSize.
|
// writer attempts to upload parts to S3 in a buffered fashion where the last
|
||||||
func (d *driver) getbuf() []byte {
|
// part is at least as large as the chunksize, so the multipart upload could be
|
||||||
return d.pool.Get().([]byte)
|
// cleanly resumed in the future. This is violated if Close is called after less
|
||||||
|
// than a full chunk is written.
|
||||||
|
type writer struct {
|
||||||
|
driver *driver
|
||||||
|
key string
|
||||||
|
multi *oss.Multi
|
||||||
|
parts []oss.Part
|
||||||
|
size int64
|
||||||
|
readyPart []byte
|
||||||
|
pendingPart []byte
|
||||||
|
closed bool
|
||||||
|
committed bool
|
||||||
|
cancelled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *driver) putbuf(p []byte) {
|
func (d *driver) newWriter(key string, multi *oss.Multi, parts []oss.Part) storagedriver.FileWriter {
|
||||||
copy(p, d.zeros)
|
var size int64
|
||||||
d.pool.Put(p)
|
for _, part := range parts {
|
||||||
|
size += part.Size
|
||||||
|
}
|
||||||
|
return &writer{
|
||||||
|
driver: d,
|
||||||
|
key: key,
|
||||||
|
multi: multi,
|
||||||
|
parts: parts,
|
||||||
|
size: size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) Write(p []byte) (int, error) {
|
||||||
|
if w.closed {
|
||||||
|
return 0, fmt.Errorf("already closed")
|
||||||
|
} else if w.committed {
|
||||||
|
return 0, fmt.Errorf("already committed")
|
||||||
|
} else if w.cancelled {
|
||||||
|
return 0, fmt.Errorf("already cancelled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the last written part is smaller than minChunkSize, we need to make a
|
||||||
|
// new multipart upload :sadface:
|
||||||
|
if len(w.parts) > 0 && int(w.parts[len(w.parts)-1].Size) < minChunkSize {
|
||||||
|
err := w.multi.Complete(w.parts)
|
||||||
|
if err != nil {
|
||||||
|
w.multi.Abort()
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
multi, err := w.driver.Bucket.InitMulti(w.key, w.driver.getContentType(), getPermissions(), w.driver.getOptions())
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
w.multi = multi
|
||||||
|
|
||||||
|
// If the entire written file is smaller than minChunkSize, we need to make
|
||||||
|
// a new part from scratch :double sad face:
|
||||||
|
if w.size < minChunkSize {
|
||||||
|
contents, err := w.driver.Bucket.Get(w.key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
w.parts = nil
|
||||||
|
w.readyPart = contents
|
||||||
|
} else {
|
||||||
|
// Otherwise we can use the old file as the new first part
|
||||||
|
_, part, err := multi.PutPartCopy(1, oss.CopyOptions{}, w.driver.Bucket.Name+"/"+w.key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
w.parts = []oss.Part{part}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var n int
|
||||||
|
|
||||||
|
for len(p) > 0 {
|
||||||
|
// If no parts are ready to write, fill up the first part
|
||||||
|
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
|
||||||
|
if len(p) >= neededBytes {
|
||||||
|
w.readyPart = append(w.readyPart, p[:neededBytes]...)
|
||||||
|
n += neededBytes
|
||||||
|
p = p[neededBytes:]
|
||||||
|
} else {
|
||||||
|
w.readyPart = append(w.readyPart, p...)
|
||||||
|
n += len(p)
|
||||||
|
p = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
|
||||||
|
if len(p) >= neededBytes {
|
||||||
|
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
|
||||||
|
n += neededBytes
|
||||||
|
p = p[neededBytes:]
|
||||||
|
err := w.flushPart()
|
||||||
|
if err != nil {
|
||||||
|
w.size += int64(n)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
w.pendingPart = append(w.pendingPart, p...)
|
||||||
|
n += len(p)
|
||||||
|
p = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.size += int64(n)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) Size() int64 {
|
||||||
|
return w.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) Close() error {
|
||||||
|
if w.closed {
|
||||||
|
return fmt.Errorf("already closed")
|
||||||
|
}
|
||||||
|
w.closed = true
|
||||||
|
return w.flushPart()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) Cancel() error {
|
||||||
|
if w.closed {
|
||||||
|
return fmt.Errorf("already closed")
|
||||||
|
} else if w.committed {
|
||||||
|
return fmt.Errorf("already committed")
|
||||||
|
}
|
||||||
|
w.cancelled = true
|
||||||
|
err := w.multi.Abort()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writer) Commit() error {
|
||||||
|
if w.closed {
|
||||||
|
return fmt.Errorf("already closed")
|
||||||
|
} else if w.committed {
|
||||||
|
return fmt.Errorf("already committed")
|
||||||
|
} else if w.cancelled {
|
||||||
|
return fmt.Errorf("already cancelled")
|
||||||
|
}
|
||||||
|
err := w.flushPart()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.committed = true
|
||||||
|
err = w.multi.Complete(w.parts)
|
||||||
|
if err != nil {
|
||||||
|
w.multi.Abort()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// flushPart flushes buffers to write a part to S3.
|
||||||
|
// Only called by Write (with both buffers full) and Close/Commit (always)
|
||||||
|
func (w *writer) flushPart() error {
|
||||||
|
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
|
||||||
|
// nothing to write
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(w.pendingPart) < int(w.driver.ChunkSize) {
|
||||||
|
// closing with a small pending part
|
||||||
|
// combine ready and pending to avoid writing a small part
|
||||||
|
w.readyPart = append(w.readyPart, w.pendingPart...)
|
||||||
|
w.pendingPart = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
part, err := w.multi.PutPart(len(w.parts)+1, bytes.NewReader(w.readyPart))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.parts = append(w.parts, part)
|
||||||
|
w.readyPart = w.pendingPart
|
||||||
|
w.pendingPart = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue