rclone/backend/sharefile/upload.go
Nick Craig-Wood e43b5ce5e5 Remove github.com/pkg/errors and replace with std library version
This is possible now that we no longer support go1.12 and brings
rclone into line with standard practices in the Go world.

This also removes errors.New and errors.Errorf from lib/errors and
prefers the stdlib errors package over lib/errors.
2021-11-07 11:53:30 +00:00

260 lines
6.5 KiB
Go

// Upload large files for sharefile
//
// Docs - https://api.sharefile.com/rest/docs/resource.aspx?name=Items#Upload_File
package sharefile
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"github.com/rclone/rclone/backend/sharefile/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest"
)
// largeUpload is used to control the upload of large files which need chunking
type largeUpload struct {
ctx context.Context
f *Fs // parent Fs
o *Object // object being uploaded
in io.Reader // read the data from here
wrap accounting.WrapFn // account parts being transferred
size int64 // total size
parts int64 // calculated number of parts, if known
info *api.UploadSpecification // where to post chunks, etc.
threads int // number of threads to use in upload
streamed bool // set if using streamed upload
}
// newLargeUpload starts an upload of object o from in with metadata in src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, info *api.UploadSpecification) (up *largeUpload, err error) {
size := src.Size()
parts := int64(-1)
if size >= 0 {
parts = size / int64(o.fs.opt.ChunkSize)
if size%int64(o.fs.opt.ChunkSize) != 0 {
parts++
}
}
var streamed bool
switch strings.ToLower(info.Method) {
case "streamed":
streamed = true
case "threaded":
streamed = false
default:
return nil, fmt.Errorf("can't use method %q with newLargeUpload", info.Method)
}
threads := f.ci.Transfers
if threads > info.MaxNumberOfThreads {
threads = info.MaxNumberOfThreads
}
// unwrap the accounting from the input, we use wrap to put it
// back on after the buffering
in, wrap := accounting.UnWrap(in)
up = &largeUpload{
ctx: ctx,
f: f,
o: o,
in: in,
wrap: wrap,
size: size,
threads: threads,
info: info,
parts: parts,
streamed: streamed,
}
return up, nil
}
// parse the api.UploadFinishResponse in respBody
func (up *largeUpload) parseUploadFinishResponse(respBody []byte) (err error) {
var finish api.UploadFinishResponse
err = json.Unmarshal(respBody, &finish)
if err != nil {
// Sometimes the unmarshal fails in which case return the body
return fmt.Errorf("upload: bad response: %q", bytes.TrimSpace(respBody))
}
return up.o.checkUploadResponse(up.ctx, &finish)
}
// Transfer a chunk
func (up *largeUpload) transferChunk(ctx context.Context, part int64, offset int64, body []byte, fileHash string) error {
md5sumRaw := md5.Sum(body)
md5sum := hex.EncodeToString(md5sumRaw[:])
size := int64(len(body))
// Add some more parameters to the ChunkURI
u := up.info.ChunkURI
u += fmt.Sprintf("&index=%d&byteOffset=%d&hash=%s&fmt=json",
part, offset, md5sum,
)
if fileHash != "" {
u += fmt.Sprintf("&finish=true&fileSize=%d&fileHash=%s",
offset+int64(len(body)),
fileHash,
)
}
opts := rest.Opts{
Method: "POST",
RootURL: u,
ContentLength: &size,
}
var respBody []byte
err := up.f.pacer.Call(func() (bool, error) {
fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body))
opts.Body = up.wrap(bytes.NewReader(body))
resp, err := up.f.srv.Call(ctx, &opts)
if err != nil {
fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
} else {
respBody, err = rest.ReadBody(resp)
}
// retry all errors now that the multipart upload has started
return err != nil, err
})
if err != nil {
fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
return err
}
// If last chunk and using "streamed" transfer, get the response back now
if up.streamed && fileHash != "" {
return up.parseUploadFinishResponse(respBody)
}
fs.Debugf(up.o, "Done sending chunk %d", part)
return nil
}
// finish closes off the large upload and reads the metadata
func (up *largeUpload) finish(ctx context.Context) error {
fs.Debugf(up.o, "Finishing large file upload")
// For a streamed transfer we will already have read the info
if up.streamed {
return nil
}
opts := rest.Opts{
Method: "POST",
RootURL: up.info.FinishURI,
}
var respBody []byte
err := up.f.pacer.Call(func() (bool, error) {
resp, err := up.f.srv.Call(ctx, &opts)
if err != nil {
return shouldRetry(ctx, resp, err)
}
respBody, err = rest.ReadBody(resp)
// retry all errors now that the multipart upload has started
return err != nil, err
})
if err != nil {
return err
}
return up.parseUploadFinishResponse(respBody)
}
// Upload uploads the chunks from the input
func (up *largeUpload) Upload(ctx context.Context) error {
if up.parts >= 0 {
fs.Debugf(up.o, "Starting upload of large file in %d chunks", up.parts)
} else {
fs.Debugf(up.o, "Starting streaming upload of large file")
}
var (
offset int64
errs = make(chan error, 1)
wg sync.WaitGroup
err error
wholeFileHash = md5.New()
eof = false
)
outer:
for part := int64(0); !eof; part++ {
// Check any errors
select {
case err = <-errs:
break outer
default:
}
// Get a block of memory
buf := up.f.getUploadBlock()
// Read the chunk
var n int
n, err = readers.ReadFill(up.in, buf)
if err == io.EOF {
eof = true
buf = buf[:n]
err = nil
} else if err != nil {
up.f.putUploadBlock(buf)
break outer
}
// Hash it
_, _ = io.Copy(wholeFileHash, bytes.NewBuffer(buf))
// Get file hash if was last chunk
fileHash := ""
if eof {
fileHash = hex.EncodeToString(wholeFileHash.Sum(nil))
}
// Transfer the chunk
wg.Add(1)
transferChunk := func(part, offset int64, buf []byte, fileHash string) {
defer wg.Done()
defer up.f.putUploadBlock(buf)
err := up.transferChunk(ctx, part, offset, buf, fileHash)
if err != nil {
select {
case errs <- err:
default:
}
}
}
if up.streamed {
transferChunk(part, offset, buf, fileHash) // streamed
} else {
go transferChunk(part, offset, buf, fileHash) // multithreaded
}
offset += int64(n)
}
wg.Wait()
// check size read is correct
if eof && err == nil && up.size >= 0 && up.size != offset {
err = fmt.Errorf("upload: short read: read %d bytes expected %d", up.size, offset)
}
// read any errors
if err == nil {
select {
case err = <-errs:
default:
}
}
// finish regardless of errors
finishErr := up.finish(ctx)
if err == nil {
err = finishErr
}
return err
}