rclone/backend/box/upload.go
Nick Craig-Wood e43b5ce5e5 Remove github.com/pkg/errors and replace with std library version
This is possible now that we no longer support go1.12 and brings
rclone into line with standard practices in the Go world.

This also removes errors.New and errors.Errorf from lib/errors and
prefers the stdlib errors package over lib/errors.
2021-11-07 11:53:30 +00:00

276 lines
7.9 KiB
Go

// multipart upload for box
package box
import (
"bytes"
"context"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
"github.com/rclone/rclone/backend/box/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/rest"
)
// createUploadSession creates an upload session for the object
func (o *Object) createUploadSession(ctx context.Context, leaf, directoryID string, size int64) (response *api.UploadSessionResponse, err error) {
opts := rest.Opts{
Method: "POST",
Path: "/files/upload_sessions",
RootURL: uploadURL,
}
request := api.UploadSessionRequest{
FileSize: size,
}
// If object has an ID then it is existing so create a new version
if o.id != "" {
opts.Path = "/files/" + o.id + "/upload_sessions"
} else {
opts.Path = "/files/upload_sessions"
request.FolderID = directoryID
request.FileName = o.fs.opt.Enc.FromStandardName(leaf)
}
var resp *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, &response)
return shouldRetry(ctx, resp, err)
})
return
}
// sha1Digest produces a digest using sha1 as per RFC3230
func sha1Digest(digest []byte) string {
return "sha=" + base64.StdEncoding.EncodeToString(digest)
}
// uploadPart uploads a part in an upload session
func (o *Object) uploadPart(ctx context.Context, SessionID string, offset, totalSize int64, chunk []byte, wrap accounting.WrapFn, options ...fs.OpenOption) (response *api.UploadPartResponse, err error) {
chunkSize := int64(len(chunk))
sha1sum := sha1.Sum(chunk)
opts := rest.Opts{
Method: "PUT",
Path: "/files/upload_sessions/" + SessionID,
RootURL: uploadURL,
ContentType: "application/octet-stream",
ContentLength: &chunkSize,
ContentRange: fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunkSize-1, totalSize),
Options: options,
ExtraHeaders: map[string]string{
"Digest": sha1Digest(sha1sum[:]),
},
}
var resp *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
opts.Body = wrap(bytes.NewReader(chunk))
resp, err = o.fs.srv.CallJSON(ctx, &opts, nil, &response)
return shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, err
}
return response, nil
}
// commitUpload finishes an upload session
func (o *Object) commitUpload(ctx context.Context, SessionID string, parts []api.Part, modTime time.Time, sha1sum []byte) (result *api.FolderItems, err error) {
opts := rest.Opts{
Method: "POST",
Path: "/files/upload_sessions/" + SessionID + "/commit",
RootURL: uploadURL,
ExtraHeaders: map[string]string{
"Digest": sha1Digest(sha1sum),
},
}
request := api.CommitUpload{
Parts: parts,
}
request.Attributes.ContentModifiedAt = api.Time(modTime)
request.Attributes.ContentCreatedAt = api.Time(modTime)
var body []byte
var resp *http.Response
// For discussion of this value see:
// https://github.com/rclone/rclone/issues/2054
maxTries := o.fs.opt.CommitRetries
const defaultDelay = 10
var tries int
outer:
for tries = 0; tries < maxTries; tries++ {
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.CallJSON(ctx, &opts, &request, nil)
if err != nil {
return shouldRetry(ctx, resp, err)
}
body, err = rest.ReadBody(resp)
return shouldRetry(ctx, resp, err)
})
delay := defaultDelay
var why string
if err != nil {
// Sometimes we get 400 Error with
// parts_mismatch immediately after uploading
// the last part. Ignore this error and wait.
if boxErr, ok := err.(*api.Error); ok && boxErr.Code == "parts_mismatch" {
why = err.Error()
} else {
return nil, err
}
} else {
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated:
break outer
case http.StatusAccepted:
why = "not ready yet"
delayString := resp.Header.Get("Retry-After")
if delayString != "" {
delay, err = strconv.Atoi(delayString)
if err != nil {
fs.Debugf(o, "Couldn't decode Retry-After header %q: %v", delayString, err)
delay = defaultDelay
}
}
default:
return nil, fmt.Errorf("unknown HTTP status return %q (%d)", resp.Status, resp.StatusCode)
}
}
fs.Debugf(o, "commit multipart upload failed %d/%d - trying again in %d seconds (%s)", tries+1, maxTries, delay, why)
time.Sleep(time.Duration(delay) * time.Second)
}
if tries >= maxTries {
return nil, errors.New("too many tries to commit multipart upload - increase --low-level-retries")
}
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("couldn't decode commit response: %q: %w", body, err)
}
return result, nil
}
// abortUpload cancels an upload session
func (o *Object) abortUpload(ctx context.Context, SessionID string) (err error) {
opts := rest.Opts{
Method: "DELETE",
Path: "/files/upload_sessions/" + SessionID,
RootURL: uploadURL,
NoResponse: true,
}
var resp *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.Call(ctx, &opts)
return shouldRetry(ctx, resp, err)
})
return err
}
// uploadMultipart uploads a file using multipart upload
func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, leaf, directoryID string, size int64, modTime time.Time, options ...fs.OpenOption) (err error) {
// Create upload session
session, err := o.createUploadSession(ctx, leaf, directoryID, size)
if err != nil {
return fmt.Errorf("multipart upload create session failed: %w", err)
}
chunkSize := session.PartSize
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", session.TotalParts, fs.SizeSuffix(chunkSize))
// Cancel the session if something went wrong
defer atexit.OnError(&err, func() {
fs.Debugf(o, "Cancelling multipart upload: %v", err)
cancelErr := o.abortUpload(ctx, session.ID)
if cancelErr != nil {
fs.Logf(o, "Failed to cancel multipart upload: %v", cancelErr)
}
})()
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
// Upload the chunks
remaining := size
position := int64(0)
parts := make([]api.Part, session.TotalParts)
hash := sha1.New()
errs := make(chan error, 1)
var wg sync.WaitGroup
outer:
for part := 0; part < session.TotalParts; part++ {
// Check any errors
select {
case err = <-errs:
break outer
default:
}
reqSize := remaining
if reqSize >= chunkSize {
reqSize = chunkSize
}
// Make a block of memory
buf := make([]byte, reqSize)
// Read the chunk
_, err = io.ReadFull(in, buf)
if err != nil {
err = fmt.Errorf("multipart upload failed to read source: %w", err)
break outer
}
// Make the global hash (must be done sequentially)
_, _ = hash.Write(buf)
// Transfer the chunk
wg.Add(1)
o.fs.uploadToken.Get()
go func(part int, position int64) {
defer wg.Done()
defer o.fs.uploadToken.Put()
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
partResponse, err := o.uploadPart(ctx, session.ID, position, size, buf, wrap, options...)
if err != nil {
err = fmt.Errorf("multipart upload failed to upload part: %w", err)
select {
case errs <- err:
default:
}
return
}
parts[part] = partResponse.Part
}(part, position)
// ready for next block
remaining -= chunkSize
position += chunkSize
}
wg.Wait()
if err == nil {
select {
case err = <-errs:
default:
}
}
if err != nil {
return err
}
// Finalise the upload session
result, err := o.commitUpload(ctx, session.ID, parts, modTime, hash.Sum(nil))
if err != nil {
return fmt.Errorf("multipart upload failed to finalize: %w", err)
}
if result.TotalCount != 1 || len(result.Entries) != 1 {
return fmt.Errorf("multipart upload failed %v - not sure why", o)
}
return o.setMetaData(&result.Entries[0])
}