forked from TrueCloudLab/rclone
e43b5ce5e5
This is possible now that we no longer support go1.12 and brings rclone into line with standard practices in the Go world. This also removes errors.New and errors.Errorf from lib/errors and prefers the stdlib errors package over lib/errors.
260 lines
6.5 KiB
Go
260 lines
6.5 KiB
Go
// Upload large files for sharefile
|
|
//
|
|
// Docs - https://api.sharefile.com/rest/docs/resource.aspx?name=Items#Upload_File
|
|
|
|
package sharefile
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/rclone/rclone/backend/sharefile/api"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/lib/readers"
|
|
"github.com/rclone/rclone/lib/rest"
|
|
)
|
|
|
|
// largeUpload is used to control the upload of large files which need chunking
|
|
type largeUpload struct {
|
|
ctx context.Context
|
|
f *Fs // parent Fs
|
|
o *Object // object being uploaded
|
|
in io.Reader // read the data from here
|
|
wrap accounting.WrapFn // account parts being transferred
|
|
size int64 // total size
|
|
parts int64 // calculated number of parts, if known
|
|
info *api.UploadSpecification // where to post chunks, etc.
|
|
threads int // number of threads to use in upload
|
|
streamed bool // set if using streamed upload
|
|
}
|
|
|
|
// newLargeUpload starts an upload of object o from in with metadata in src
|
|
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, info *api.UploadSpecification) (up *largeUpload, err error) {
|
|
size := src.Size()
|
|
parts := int64(-1)
|
|
if size >= 0 {
|
|
parts = size / int64(o.fs.opt.ChunkSize)
|
|
if size%int64(o.fs.opt.ChunkSize) != 0 {
|
|
parts++
|
|
}
|
|
}
|
|
|
|
var streamed bool
|
|
switch strings.ToLower(info.Method) {
|
|
case "streamed":
|
|
streamed = true
|
|
case "threaded":
|
|
streamed = false
|
|
default:
|
|
return nil, fmt.Errorf("can't use method %q with newLargeUpload", info.Method)
|
|
}
|
|
|
|
threads := f.ci.Transfers
|
|
if threads > info.MaxNumberOfThreads {
|
|
threads = info.MaxNumberOfThreads
|
|
}
|
|
|
|
// unwrap the accounting from the input, we use wrap to put it
|
|
// back on after the buffering
|
|
in, wrap := accounting.UnWrap(in)
|
|
up = &largeUpload{
|
|
ctx: ctx,
|
|
f: f,
|
|
o: o,
|
|
in: in,
|
|
wrap: wrap,
|
|
size: size,
|
|
threads: threads,
|
|
info: info,
|
|
parts: parts,
|
|
streamed: streamed,
|
|
}
|
|
return up, nil
|
|
}
|
|
|
|
// parse the api.UploadFinishResponse in respBody
|
|
func (up *largeUpload) parseUploadFinishResponse(respBody []byte) (err error) {
|
|
var finish api.UploadFinishResponse
|
|
err = json.Unmarshal(respBody, &finish)
|
|
if err != nil {
|
|
// Sometimes the unmarshal fails in which case return the body
|
|
return fmt.Errorf("upload: bad response: %q", bytes.TrimSpace(respBody))
|
|
}
|
|
return up.o.checkUploadResponse(up.ctx, &finish)
|
|
}
|
|
|
|
// Transfer a chunk
|
|
func (up *largeUpload) transferChunk(ctx context.Context, part int64, offset int64, body []byte, fileHash string) error {
|
|
md5sumRaw := md5.Sum(body)
|
|
md5sum := hex.EncodeToString(md5sumRaw[:])
|
|
size := int64(len(body))
|
|
|
|
// Add some more parameters to the ChunkURI
|
|
u := up.info.ChunkURI
|
|
u += fmt.Sprintf("&index=%d&byteOffset=%d&hash=%s&fmt=json",
|
|
part, offset, md5sum,
|
|
)
|
|
if fileHash != "" {
|
|
u += fmt.Sprintf("&finish=true&fileSize=%d&fileHash=%s",
|
|
offset+int64(len(body)),
|
|
fileHash,
|
|
)
|
|
}
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
RootURL: u,
|
|
ContentLength: &size,
|
|
}
|
|
var respBody []byte
|
|
err := up.f.pacer.Call(func() (bool, error) {
|
|
fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body))
|
|
opts.Body = up.wrap(bytes.NewReader(body))
|
|
resp, err := up.f.srv.Call(ctx, &opts)
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
|
|
} else {
|
|
respBody, err = rest.ReadBody(resp)
|
|
}
|
|
// retry all errors now that the multipart upload has started
|
|
return err != nil, err
|
|
})
|
|
if err != nil {
|
|
fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
|
|
return err
|
|
}
|
|
// If last chunk and using "streamed" transfer, get the response back now
|
|
if up.streamed && fileHash != "" {
|
|
return up.parseUploadFinishResponse(respBody)
|
|
}
|
|
fs.Debugf(up.o, "Done sending chunk %d", part)
|
|
return nil
|
|
}
|
|
|
|
// finish closes off the large upload and reads the metadata
|
|
func (up *largeUpload) finish(ctx context.Context) error {
|
|
fs.Debugf(up.o, "Finishing large file upload")
|
|
// For a streamed transfer we will already have read the info
|
|
if up.streamed {
|
|
return nil
|
|
}
|
|
|
|
opts := rest.Opts{
|
|
Method: "POST",
|
|
RootURL: up.info.FinishURI,
|
|
}
|
|
var respBody []byte
|
|
err := up.f.pacer.Call(func() (bool, error) {
|
|
resp, err := up.f.srv.Call(ctx, &opts)
|
|
if err != nil {
|
|
return shouldRetry(ctx, resp, err)
|
|
}
|
|
respBody, err = rest.ReadBody(resp)
|
|
// retry all errors now that the multipart upload has started
|
|
return err != nil, err
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return up.parseUploadFinishResponse(respBody)
|
|
}
|
|
|
|
// Upload uploads the chunks from the input
|
|
func (up *largeUpload) Upload(ctx context.Context) error {
|
|
if up.parts >= 0 {
|
|
fs.Debugf(up.o, "Starting upload of large file in %d chunks", up.parts)
|
|
} else {
|
|
fs.Debugf(up.o, "Starting streaming upload of large file")
|
|
}
|
|
var (
|
|
offset int64
|
|
errs = make(chan error, 1)
|
|
wg sync.WaitGroup
|
|
err error
|
|
wholeFileHash = md5.New()
|
|
eof = false
|
|
)
|
|
outer:
|
|
for part := int64(0); !eof; part++ {
|
|
// Check any errors
|
|
select {
|
|
case err = <-errs:
|
|
break outer
|
|
default:
|
|
}
|
|
|
|
// Get a block of memory
|
|
buf := up.f.getUploadBlock()
|
|
|
|
// Read the chunk
|
|
var n int
|
|
n, err = readers.ReadFill(up.in, buf)
|
|
if err == io.EOF {
|
|
eof = true
|
|
buf = buf[:n]
|
|
err = nil
|
|
} else if err != nil {
|
|
up.f.putUploadBlock(buf)
|
|
break outer
|
|
}
|
|
|
|
// Hash it
|
|
_, _ = io.Copy(wholeFileHash, bytes.NewBuffer(buf))
|
|
|
|
// Get file hash if was last chunk
|
|
fileHash := ""
|
|
if eof {
|
|
fileHash = hex.EncodeToString(wholeFileHash.Sum(nil))
|
|
}
|
|
|
|
// Transfer the chunk
|
|
wg.Add(1)
|
|
transferChunk := func(part, offset int64, buf []byte, fileHash string) {
|
|
defer wg.Done()
|
|
defer up.f.putUploadBlock(buf)
|
|
err := up.transferChunk(ctx, part, offset, buf, fileHash)
|
|
if err != nil {
|
|
select {
|
|
case errs <- err:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
if up.streamed {
|
|
transferChunk(part, offset, buf, fileHash) // streamed
|
|
} else {
|
|
go transferChunk(part, offset, buf, fileHash) // multithreaded
|
|
}
|
|
|
|
offset += int64(n)
|
|
}
|
|
wg.Wait()
|
|
|
|
// check size read is correct
|
|
if eof && err == nil && up.size >= 0 && up.size != offset {
|
|
err = fmt.Errorf("upload: short read: read %d bytes expected %d", up.size, offset)
|
|
}
|
|
|
|
// read any errors
|
|
if err == nil {
|
|
select {
|
|
case err = <-errs:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// finish regardless of errors
|
|
finishErr := up.finish(ctx)
|
|
if err == nil {
|
|
err = finishErr
|
|
}
|
|
|
|
return err
|
|
}
|