box: improve accounting for chunked uploads
This commit is contained in:
parent
bf6101cb6c
commit
82418c3021
1 changed files with 8 additions and 5 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
|
|
||||||
"github.com/ncw/rclone/backend/box/api"
|
"github.com/ncw/rclone/backend/box/api"
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fs/accounting"
|
||||||
"github.com/ncw/rclone/lib/rest"
|
"github.com/ncw/rclone/lib/rest"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
@ -52,9 +53,8 @@ func sha1Digest(digest []byte) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadPart uploads a part in an upload session
|
// uploadPart uploads a part in an upload session
|
||||||
func (o *Object) uploadPart(SessionID string, offset, totalSize int64, chunk []byte) (response *api.UploadPartResponse, err error) {
|
func (o *Object) uploadPart(SessionID string, offset, totalSize int64, chunk []byte, wrap accounting.WrapFn) (response *api.UploadPartResponse, err error) {
|
||||||
chunkSize := int64(len(chunk))
|
chunkSize := int64(len(chunk))
|
||||||
in := bytes.NewReader(chunk)
|
|
||||||
sha1sum := sha1.Sum(chunk)
|
sha1sum := sha1.Sum(chunk)
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
|
@ -66,11 +66,10 @@ func (o *Object) uploadPart(SessionID string, offset, totalSize int64, chunk []b
|
||||||
ExtraHeaders: map[string]string{
|
ExtraHeaders: map[string]string{
|
||||||
"Digest": sha1Digest(sha1sum[:]),
|
"Digest": sha1Digest(sha1sum[:]),
|
||||||
},
|
},
|
||||||
Body: in,
|
|
||||||
}
|
}
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
_, _ = in.Seek(0, 0)
|
opts.Body = wrap(bytes.NewReader(chunk))
|
||||||
resp, err = o.fs.srv.CallJSON(&opts, nil, &response)
|
resp, err = o.fs.srv.CallJSON(&opts, nil, &response)
|
||||||
return shouldRetry(resp, err)
|
return shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
|
@ -189,6 +188,10 @@ func (o *Object) uploadMultipart(in io.Reader, leaf, directoryID string, size in
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// unwrap the accounting from the input, we use wrap to put it
|
||||||
|
// back on after the buffering
|
||||||
|
in, wrap := accounting.UnWrap(in)
|
||||||
|
|
||||||
// Upload the chunks
|
// Upload the chunks
|
||||||
remaining := size
|
remaining := size
|
||||||
position := int64(0)
|
position := int64(0)
|
||||||
|
@ -230,7 +233,7 @@ outer:
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer o.fs.uploadToken.Put()
|
defer o.fs.uploadToken.Put()
|
||||||
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
|
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
|
||||||
partResponse, err := o.uploadPart(session.ID, position, size, buf)
|
partResponse, err := o.uploadPart(session.ID, position, size, buf, wrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "multipart upload failed to upload part")
|
err = errors.Wrap(err, "multipart upload failed to upload part")
|
||||||
select {
|
select {
|
||||||
|
|
Loading…
Add table
Reference in a new issue