dropbox: simplify chunked uploads

Signed-off-by: Alexey Ivanov <rbtz@dropbox.com>
This commit is contained in:
Alexey Ivanov 2021-03-30 22:10:29 -07:00 committed by Nick Craig-Wood
parent 798d1293df
commit 2fa7a3c0fb

View file

@ -22,7 +22,6 @@ of path_display and all will be well.
*/ */
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io" "io"
@ -1608,104 +1607,70 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
// uploadChunked uploads the object in parts // uploadChunked uploads the object in parts
// //
// Will work optimally if size is >= uploadChunkSize. If the size is either // Will introduce two additional network requests to start and finish the session.
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an // If the size is unknown (i.e. -1) the method incurs one additional
// avoidable request to the Dropbox API that does not carry payload. // request to the Dropbox API that does not carry a payload to close the append session.
func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
batching := o.fs.batcher.Batching() // start upload
chunkSize := int64(o.fs.opt.ChunkSize)
chunks := 0
if size >= 0 {
chunks = int(size/chunkSize) + 1
}
in := readers.NewCountingReader(in0)
buf := make([]byte, int(chunkSize))
fmtChunk := func(cur int, last bool) {
if chunks == 0 && last {
fs.Debugf(o, "Streaming chunk %d/%d", cur, cur)
} else if chunks == 0 {
fs.Debugf(o, "Streaming chunk %d/unknown", cur)
} else if chunks != 1 {
fs.Debugf(o, "Uploading chunk %d/%d", cur, chunks)
}
}
appendArg := files.UploadSessionAppendArg{
Close: chunks == 1,
}
// write the first chunk
fmtChunk(1, false)
var res *files.UploadSessionStartResult var res *files.UploadSessionStartResult
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, nil)
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
return false, nil
}
arg := files.UploadSessionStartArg{
Close: appendArg.Close,
}
res, err = o.fs.srv.UploadSessionStart(&arg, chunk)
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
chunkSize := int64(o.fs.opt.ChunkSize)
chunks, remainder := size/chunkSize, size%chunkSize
if remainder > 0 {
chunks++
}
// write chunks
in := readers.NewCountingReader(in0)
buf := make([]byte, int(chunkSize))
cursor := files.UploadSessionCursor{ cursor := files.UploadSessionCursor{
SessionId: res.SessionId, SessionId: res.SessionId,
Offset: 0, Offset: 0,
} }
appendArg.Cursor = &cursor appendArg := files.UploadSessionAppendArg{Cursor: &cursor}
for currentChunk := 1; ; currentChunk++ {
// write more whole chunks (if any, and if !batching), if
// batching write the last chunk also.
currentChunk := 2
for {
if chunks > 0 {
// Size known
if currentChunk == chunks {
// Last chunk
if !batching {
// if the size is known, only upload full chunks. Remaining bytes are uploaded with
// the UploadSessionFinish request.
break
}
appendArg.Close = true
} else if currentChunk > chunks {
break
}
} else {
// Size unknown
lastReadWasShort := in.BytesRead()-cursor.Offset < uint64(chunkSize)
if lastReadWasShort {
// if the size is unknown, upload as long as we can read full chunks from the reader.
// The UploadSessionFinish request will not contain any payload.
// This is also what we want if batching
break
}
}
cursor.Offset = in.BytesRead() cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false)
chunk = readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) if chunks < 0 {
fs.Debugf(o, "Streaming chunk %d/unknown", currentChunk)
} else {
fs.Debugf(o, "Uploading chunk %d/%d", currentChunk, chunks)
}
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry // seek to the start in case this is a retry
if _, err = chunk.Seek(0, io.SeekStart); err != nil { if _, err = chunk.Seek(0, io.SeekStart); err != nil {
return false, nil return false, nil
} }
err = o.fs.srv.UploadSessionAppendV2(&appendArg, chunk) err = o.fs.srv.UploadSessionAppendV2(&appendArg, chunk)
// after the first chunk is uploaded, we retry everything // after session is started, we retry everything
return err != nil, err return err != nil, err
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
currentChunk++ if appendArg.Close {
break
}
if size > 0 {
// if size is known, check if next chunk is final
appendArg.Close = uint64(size)-in.BytesRead() <= uint64(chunkSize)
} else {
// if size is unknown, upload as long as we can read full chunks from the reader
appendArg.Close = in.BytesRead()-cursor.Offset < uint64(chunkSize)
}
} }
// write the remains // finish upload
cursor.Offset = in.BytesRead() cursor.Offset = in.BytesRead()
args := &files.UploadSessionFinishArg{ args := &files.UploadSessionFinishArg{
Cursor: &cursor, Cursor: &cursor,
@ -1713,32 +1678,12 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f
} }
// If we are batching then we should have written all the data now // If we are batching then we should have written all the data now
// store the commit info now for a batch commit // store the commit info now for a batch commit
if batching { if o.fs.batcher.Batching() {
// If we haven't closed the session then we need to
if !appendArg.Close {
appendArg.Close = true
fs.Debugf(o, "Closing session")
var empty bytes.Buffer
err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &empty)
// after the first chunk is uploaded, we retry everything
return err != nil, err
})
if err != nil {
return nil, err
}
}
return o.fs.batcher.Commit(ctx, args) return o.fs.batcher.Commit(ctx, args)
} }
fmtChunk(currentChunk, true)
chunk = readers.NewRepeatableReaderBuffer(in, buf)
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry entry, err = o.fs.srv.UploadSessionFinish(args, nil)
if _, err = chunk.Seek(0, io.SeekStart); err != nil {
return false, nil
}
entry, err = o.fs.srv.UploadSessionFinish(args, chunk)
// If error is insufficient space then don't retry // If error is insufficient space then don't retry
if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e, ok := err.(files.UploadSessionFinishAPIError); ok {
if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {