Compare commits
4 commits
tcl/master
...
resume
Author | SHA1 | Date | |
---|---|---|---|
|
604dc4d1f9 | ||
|
e65e046c21 | ||
|
b015012d8b | ||
|
ac2e5fde36 |
20 changed files with 910 additions and 85 deletions
2
backend/cache/cache_test.go
vendored
2
backend/cache/cache_test.go
vendored
|
@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
|
|||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestCache:",
|
||||
NilObject: (*cache.Object)(nil),
|
||||
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt"},
|
||||
UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "Resume"},
|
||||
UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier"},
|
||||
SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache
|
||||
})
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"encoding"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
|
@ -13,6 +15,7 @@ import (
|
|||
gohash "hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"path"
|
||||
"regexp"
|
||||
|
@ -379,6 +382,8 @@ type Fs struct {
|
|||
features *fs.Features // optional features
|
||||
dirSort bool // reserved for future, ignored
|
||||
useNoRename bool // can be set with the transactions option
|
||||
hashState string // set in resume(), used to restore hash state
|
||||
resumeXactID string // set in resume(), allows reuse of xactID upon resume
|
||||
}
|
||||
|
||||
// configure sets up chunker for given name format, meta format and hash type.
|
||||
|
@ -1152,7 +1157,41 @@ func (f *Fs) put(
|
|||
|
||||
// Prepare to upload
|
||||
c := f.newChunkingReader(src)
|
||||
wrapIn := c.wrapStream(ctx, in, src)
|
||||
// Prepare for resume if resumable
|
||||
var resumeOpt *fs.OptionResume
|
||||
// partialHashState will be used in wrapStream to restore hash state
|
||||
var partialHashState []byte
|
||||
for _, option := range options {
|
||||
switch option.(type) {
|
||||
case *fs.OptionResume:
|
||||
resumeOpt = option.(*fs.OptionResume)
|
||||
if resumeOpt.Pos != 0 {
|
||||
numChunksOnRemote := resumeOpt.Pos / int64(f.opt.ChunkSize)
|
||||
// Checks for existing chunks on the remote
|
||||
for i := 0; i < int(numChunksOnRemote); i++ {
|
||||
existingChunkName := f.makeChunkName(remote, i, "", f.resumeXactID)
|
||||
existingChunk, err := f.base.NewObject(ctx, existingChunkName)
|
||||
// If NewObject returns an error the chunk likely doesn't exist on the remote and we cannot resume
|
||||
if err != nil {
|
||||
resumeOpt.Pos = 0
|
||||
c.chunks = nil
|
||||
break
|
||||
}
|
||||
c.chunks = append(c.chunks, existingChunk)
|
||||
}
|
||||
fs.Debugf(f, "Resuming at chunk number: %d", numChunksOnRemote)
|
||||
partialHashState, _ = base64.StdEncoding.DecodeString(f.hashState)
|
||||
// Discard bytes that already exist on remote
|
||||
written, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.accountBytes(written)
|
||||
c.sizeLeft = c.sizeTotal - c.readCount
|
||||
}
|
||||
}
|
||||
}
|
||||
wrapIn := c.wrapStream(ctx, in, src, partialHashState)
|
||||
|
||||
var metaObject fs.Object
|
||||
defer func() {
|
||||
|
@ -1162,13 +1201,22 @@ func (f *Fs) put(
|
|||
}()
|
||||
|
||||
baseRemote := remote
|
||||
xactID, errXact := f.newXactID(ctx, baseRemote)
|
||||
if errXact != nil {
|
||||
return nil, errXact
|
||||
var xactID string
|
||||
if resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
xactID = f.resumeXactID
|
||||
} else {
|
||||
xactID, err = f.newXactID(ctx, baseRemote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Transfer chunks data
|
||||
for c.chunkNo = 0; !c.done; c.chunkNo++ {
|
||||
// skip to chunk we can resume from if resumeOpt is set
|
||||
if c.chunkNo == 0 && resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
c.chunkNo = int(resumeOpt.Pos) / int(f.opt.ChunkSize)
|
||||
}
|
||||
if c.chunkNo > maxSafeChunkNumber {
|
||||
return nil, ErrChunkOverflow
|
||||
}
|
||||
|
@ -1230,6 +1278,41 @@ func (f *Fs) put(
|
|||
c.chunkLimit = c.chunkSize
|
||||
|
||||
c.chunks = append(c.chunks, chunk)
|
||||
|
||||
// If an OptionResume was passed than we should call SetID so a resume can be attempted in event of a failure
|
||||
// ID keeps track of the first chunk that should be uploaded if a resume is attempted
|
||||
if resumeOpt != nil {
|
||||
// Publish hash state to control chunk
|
||||
marshaler, ok := c.hasher.(encoding.BinaryMarshaler)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The hash type does not implement encoding.BinaryMarshaler")
|
||||
}
|
||||
state, err := marshaler.MarshalBinary()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hashType := f.opt.HashType
|
||||
data, err := marshalPartialHashJSON(ctx, hashType, base64.StdEncoding.EncodeToString(state))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
controlChunkName := f.makeChunkName(remote, -1, "phash", xactID)
|
||||
controlInfo := f.wrapInfo(src, controlChunkName, int64(len(data)))
|
||||
controlChunk, err := basePut(ctx, bytes.NewReader(data), controlInfo)
|
||||
defer func() {
|
||||
_ = controlChunk.Remove(ctx)
|
||||
}()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
positionStr := strconv.Itoa(c.chunkNo + 1) // stores the number of chunks uploaded
|
||||
chunkSizeStr := strconv.FormatInt(c.chunkSize, 10)
|
||||
startFromStr := strconv.FormatInt(int64(f.opt.StartFrom), 10)
|
||||
err = resumeOpt.SetID(ctx, chunkSizeStr+","+startFromStr+","+positionStr+","+xactID, f.opt.HashType, base64.StdEncoding.EncodeToString(state))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Validate uploaded size
|
||||
|
@ -1356,7 +1439,7 @@ func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader {
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader {
|
||||
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, partialHashState []byte) io.Reader {
|
||||
baseIn, wrapBack := accounting.UnWrap(in)
|
||||
|
||||
switch {
|
||||
|
@ -1391,6 +1474,15 @@ func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.Ob
|
|||
}
|
||||
|
||||
if c.hasher != nil {
|
||||
// Restores hash state during a resume
|
||||
if partialHashState != nil {
|
||||
unmarshaler, ok := c.hasher.(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
log.Fatal("unable to unmarshal hash:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
baseIn = io.TeeReader(baseIn, c.hasher)
|
||||
}
|
||||
c.baseReader = baseIn
|
||||
|
@ -2510,6 +2602,34 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
|
|||
return info, true, nil
|
||||
}
|
||||
|
||||
// Format for partial hash control chunks
|
||||
type partialHashJSON struct {
|
||||
HashType string `json:"htype"`
|
||||
PartialHash string `json:"phash"`
|
||||
}
|
||||
|
||||
// marshalPartialHashJSON
|
||||
//
|
||||
// Creates a JSON containing the hashType being used and the partial hash state. This will be stored in
|
||||
// a control chunk and used for resume functionality.
|
||||
//
|
||||
func marshalPartialHashJSON(ctx context.Context, hashType, partialHash string) ([]byte, error) {
|
||||
controlData := partialHashJSON{
|
||||
HashType: hashType,
|
||||
PartialHash: partialHash,
|
||||
}
|
||||
data, err := json.Marshal(&controlData)
|
||||
return data, err
|
||||
}
|
||||
|
||||
// unmarshalPartialHashJSON parses partial hash control chunk.
|
||||
//
|
||||
func unmarshalPartialHashJSON(ctx context.Context, data []byte) (hashType, partialHashState string, err error) {
|
||||
var partialHashData partialHashJSON
|
||||
err = json.Unmarshal(data, &partialHashData)
|
||||
return partialHashData.HashType, partialHashData.PartialHash, err
|
||||
}
|
||||
|
||||
func silentlyRemove(ctx context.Context, o fs.Object) {
|
||||
_ = o.Remove(ctx) // ignore error
|
||||
}
|
||||
|
@ -2544,9 +2664,58 @@ func (f *Fs) CanQuickRename() bool {
|
|||
return f.base.Features().Move != nil
|
||||
}
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
|
||||
idSlice := strings.Split(ID, ",")
|
||||
cachedChunkSize, err := strconv.ParseInt(idSlice[0], 10, 64)
|
||||
cachedStartFrom, err := strconv.ParseInt(idSlice[1], 10, 64)
|
||||
cachedChunkNo, err := strconv.ParseInt(idSlice[2], 10, 64)
|
||||
cachedXactID := idSlice[3]
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if cachedChunkSize != int64(f.opt.ChunkSize) {
|
||||
return 0, errors.New("ChunkSize doesn't match for file we are trying to resume")
|
||||
}
|
||||
if f.opt.StartFrom != int(cachedStartFrom) {
|
||||
return 0, errors.New("StartFrom doesn't match for file we are trying to resume")
|
||||
}
|
||||
// Check partial hash control chunk
|
||||
controlChunkName := f.makeChunkName(remote, -1, "phash", cachedXactID)
|
||||
hashControlChunk, err := f.base.NewObject(ctx, controlChunkName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
reader, err := hashControlChunk.Open(ctx)
|
||||
data, err := ioutil.ReadAll(reader)
|
||||
_ = reader.Close() // ensure file handle is freed on windows
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
remoteHashType, remoteHashState, err := unmarshalPartialHashJSON(ctx, data)
|
||||
if remoteHashType == hashName && remoteHashState == hashState {
|
||||
if f.opt.HashType != remoteHashType {
|
||||
fs.Debugf(f, "Resume skipped, mismatch hash types. prev: %s, curr: %s", remoteHashType, f.opt.HashType)
|
||||
return 0, nil
|
||||
}
|
||||
pos := cachedChunkNo * cachedChunkSize
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
f.hashState = hashState
|
||||
f.resumeXactID = cachedXactID
|
||||
return pos, nil
|
||||
}
|
||||
|
||||
// No valid control chunks found, rewind from start
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = (*Fs)(nil)
|
||||
_ fs.Resumer = (*Fs)(nil)
|
||||
_ fs.Purger = (*Fs)(nil)
|
||||
_ fs.Copier = (*Fs)(nil)
|
||||
_ fs.Mover = (*Fs)(nil)
|
||||
|
|
|
@ -23,7 +23,7 @@ func TestIntegration(t *testing.T) {
|
|||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: *fstest.RemoteName,
|
||||
NilObject: (*crypt.Object)(nil),
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt"},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "Resume"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -24,6 +25,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/config/configstruct"
|
||||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
|
@ -230,6 +232,7 @@ type Fs struct {
|
|||
precision time.Duration // precision of local filesystem
|
||||
warnedMu sync.Mutex // used for locking access to 'warned'.
|
||||
warned map[string]struct{} // whether we have warned about this string
|
||||
hashState map[string]string // set in resume(), used to restore hash state
|
||||
|
||||
// do os.Lstat or os.Stat
|
||||
lstat func(name string) (os.FileInfo, error)
|
||||
|
@ -267,11 +270,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
}
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
opt: *opt,
|
||||
warned: make(map[string]struct{}),
|
||||
dev: devUnset,
|
||||
lstat: os.Lstat,
|
||||
name: name,
|
||||
opt: *opt,
|
||||
warned: make(map[string]struct{}),
|
||||
hashState: make(map[string]string),
|
||||
dev: devUnset,
|
||||
lstat: os.Lstat,
|
||||
}
|
||||
f.root = cleanRootPath(root, f.opt.NoUNC, f.opt.Enc)
|
||||
f.features = (&fs.Features{
|
||||
|
@ -1115,6 +1119,7 @@ func (nwc nopWriterCloser) Close() error {
|
|||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
|
||||
var out io.WriteCloser
|
||||
var hasher *hash.MultiHasher
|
||||
var resumeOpt *fs.OptionResume
|
||||
|
||||
for _, option := range options {
|
||||
switch x := option.(type) {
|
||||
|
@ -1125,6 +1130,32 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return err
|
||||
}
|
||||
}
|
||||
case *fs.OptionResume:
|
||||
resumeOpt = option.(*fs.OptionResume)
|
||||
if resumeOpt.Pos != 0 {
|
||||
fs.Logf(o, "Resuming at byte position: %d", resumeOpt.Pos)
|
||||
// Discard bytes that already exist on backend
|
||||
_, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashType := o.fs.Hashes().GetOne()
|
||||
if resumeOpt.Hash != "" {
|
||||
if err = hashType.Set(resumeOpt.Hash); err != nil {
|
||||
return err
|
||||
}
|
||||
if !o.fs.Hashes().Contains(hashType) {
|
||||
return fmt.Errorf("unsupported resume hash: %q", resumeOpt.Hash)
|
||||
}
|
||||
}
|
||||
hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := hasher.RestoreHashState(hashType, o.fs.hashState[o.remote]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1138,7 +1169,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
// If it is a translated link, just read in the contents, and
|
||||
// then create a symlink
|
||||
if !o.translatedLink {
|
||||
f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
var f *os.File
|
||||
if resumeOpt != nil && resumeOpt.Pos != 0 {
|
||||
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_APPEND, 0666)
|
||||
} else {
|
||||
f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
}
|
||||
if err != nil {
|
||||
if runtime.GOOS == "windows" && os.IsPermission(err) {
|
||||
// If permission denied on Windows might be trying to update a
|
||||
|
@ -1152,7 +1188,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
return err
|
||||
}
|
||||
}
|
||||
if !o.fs.opt.NoPreAllocate {
|
||||
if !o.fs.opt.NoPreAllocate && resumeOpt != nil && resumeOpt.Pos == 0 {
|
||||
// Pre-allocate the file for performance reasons
|
||||
err = file.PreAllocate(src.Size(), f)
|
||||
if err != nil {
|
||||
|
@ -1173,7 +1209,46 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
in = io.TeeReader(in, hasher)
|
||||
}
|
||||
|
||||
_, err = io.Copy(out, in)
|
||||
var cacheingWg sync.WaitGroup // Used to halt code execution while resume cache is written
|
||||
var copyWg sync.WaitGroup // Ensure that io.Copy has returned before writing resume data
|
||||
copyWg.Add(1)
|
||||
// Context for read so that we can handle io.copy being interrupted
|
||||
ctxr, cancel := context.WithCancel(ctx)
|
||||
// Create exit handler during Copy so that resume data can be written if interrupted
|
||||
var atexitOnce sync.Once
|
||||
atexitHandle := atexit.Register(func() {
|
||||
atexitOnce.Do(func() {
|
||||
if resumeOpt == nil || hasher == nil {
|
||||
return
|
||||
}
|
||||
// If OptionResume was passed, call SetID to prepare for future resumes
|
||||
// ID is the number of bytes written to the destination
|
||||
// Stops the copy so cache is consistent with remote
|
||||
cacheingWg.Add(1)
|
||||
cancel()
|
||||
copyWg.Wait()
|
||||
fs.Infof(o, "Updating resume cache")
|
||||
fileInfo, _ := o.fs.lstat(o.path)
|
||||
writtenStr := strconv.FormatInt(fileInfo.Size(), 10)
|
||||
hashType := hasher.Hashes().GetOne()
|
||||
hashState, err := hasher.GetHashState(hashType)
|
||||
if err == nil {
|
||||
err = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState)
|
||||
}
|
||||
if err != nil {
|
||||
fs.Logf(o, "Updating resume cache failed: %v", err)
|
||||
}
|
||||
})
|
||||
})
|
||||
cr := readers.NewContextReader(ctxr, in)
|
||||
_, err = io.Copy(out, cr)
|
||||
copyWg.Done()
|
||||
atexit.Unregister(atexitHandle)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
// If resume data is being written we want to wait here for the program to exit
|
||||
cacheingWg.Wait()
|
||||
}
|
||||
|
||||
closeErr := out.Close()
|
||||
if err == nil {
|
||||
err = closeErr
|
||||
|
@ -1338,9 +1413,44 @@ func cleanRootPath(s string, noUNC bool, enc encoder.MultiEncoder) string {
|
|||
return s
|
||||
}
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
|
||||
cachedPos, err := strconv.ParseInt(ID, 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Compare hash of partial file on remote with partial hash in cache
|
||||
remoteObject, err := f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if remoteObject.Size() != cachedPos {
|
||||
return 0, errors.New("size on remote does not match resume cache")
|
||||
}
|
||||
hashType := hash.NameToType(hashName)
|
||||
remoteHash, err := remoteObject.Hash(ctx, hashType)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cachedHash, err := hash.SumPartialHash(hashName, hashState)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Hashes match, attempt resume
|
||||
if cachedHash == remoteHash {
|
||||
f.hashState[remote] = hashState
|
||||
return cachedPos, nil
|
||||
}
|
||||
// No valid position found, restart from beginning
|
||||
fs.Infof(remote, "Not resuming as cached hash state did not match hash state on remote")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = &Fs{}
|
||||
_ fs.Resumer = &Fs{}
|
||||
_ fs.Purger = &Fs{}
|
||||
_ fs.PutStreamer = &Fs{}
|
||||
_ fs.Mover = &Fs{}
|
||||
|
|
|
@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) {
|
|||
}
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: *fstest.RemoteName,
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles", "Resume"},
|
||||
UnimplementableObjectMethods: []string{"MimeType"},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -130,6 +130,8 @@ type ConfigInfo struct {
|
|||
FsCacheExpireDuration time.Duration
|
||||
FsCacheExpireInterval time.Duration
|
||||
DisableHTTP2 bool
|
||||
MaxResumeCacheSize SizeSuffix
|
||||
ResumeCutoff SizeSuffix
|
||||
HumanReadable bool
|
||||
KvLockTime time.Duration // maximum time to keep key-value database locked by process
|
||||
}
|
||||
|
@ -163,6 +165,8 @@ func NewConfig() *ConfigInfo {
|
|||
c.TPSLimitBurst = 1
|
||||
c.MaxTransfer = -1
|
||||
c.MaxBacklog = 10000
|
||||
c.MaxResumeCacheSize = SizeSuffix(100 * 1024)
|
||||
c.ResumeCutoff = -1
|
||||
// We do not want to set the default here. We use this variable being empty as part of the fall-through of options.
|
||||
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
|
||||
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
|
||||
|
|
|
@ -132,6 +132,8 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
|
|||
flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files")
|
||||
flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window (supported on Windows only)")
|
||||
flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections, value or name, e.g. CS1, LE, DF, AF21")
|
||||
flags.FVarP(flagSet, &ci.MaxResumeCacheSize, "max-resume-cache-size", "", "The maximum size of the cache used to store data necessary for resuming uploads. When the storage grows beyond this size, the oldest resume data will be deleted. (default 100k")
|
||||
flags.FVarP(flagSet, &ci.ResumeCutoff, "resume-cutoff", "", "If set, attempt to resume all partial uploads larger than this size. (default off)")
|
||||
flags.DurationVarP(flagSet, &ci.FsCacheExpireDuration, "fs-cache-expire-duration", "", ci.FsCacheExpireDuration, "Cache remotes for this long (0 to disable caching)")
|
||||
flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "Interval to check for expired remotes")
|
||||
flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport")
|
||||
|
|
|
@ -163,6 +163,10 @@ type Features struct {
|
|||
// Shutdown the backend, closing any background tasks and any
|
||||
// cached connections.
|
||||
Shutdown func(ctx context.Context) error
|
||||
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
Resume func(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
|
||||
}
|
||||
|
||||
// Disable nil's out the named feature. If it isn't found then it
|
||||
|
@ -290,6 +294,9 @@ func (ft *Features) Fill(ctx context.Context, f Fs) *Features {
|
|||
if do, ok := f.(Shutdowner); ok {
|
||||
ft.Shutdown = do.Shutdown
|
||||
}
|
||||
if do, ok := f.(Resumer); ok {
|
||||
ft.Resume = do.Resume
|
||||
}
|
||||
return ft.DisableList(GetConfig(ctx).DisableFeatures)
|
||||
}
|
||||
|
||||
|
@ -636,6 +643,13 @@ type Shutdowner interface {
|
|||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Resumer is an optional interface for Fs
|
||||
type Resumer interface {
|
||||
// Resume checks whether the (remote, ID) pair is valid and returns
|
||||
// the point the file should be resumed from or an error.
|
||||
Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error)
|
||||
}
|
||||
|
||||
// ObjectsChan is a channel of Objects
|
||||
type ObjectsChan chan Object
|
||||
|
||||
|
|
1
fs/fs.go
1
fs/fs.go
|
@ -48,6 +48,7 @@ var (
|
|||
ErrorNotImplemented = errors.New("optional feature not implemented")
|
||||
ErrorCommandNotFound = errors.New("command not found")
|
||||
ErrorFileNameTooLong = errors.New("file name too long")
|
||||
ErrorCantResume = errors.New("can't resume file upload")
|
||||
)
|
||||
|
||||
// CheckClose is a utility function used to check the return from
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
|
@ -228,6 +229,14 @@ func (m *MultiHasher) Write(p []byte) (n int, err error) {
|
|||
return n, err
|
||||
}
|
||||
|
||||
// Hashes returns accumulated hash types.
|
||||
func (m *MultiHasher) Hashes() (set Set) {
|
||||
for ht := range m.h {
|
||||
set.Add(ht)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Sums returns the sums of all accumulated hashes as hex encoded
|
||||
// strings.
|
||||
func (m *MultiHasher) Sums() map[Type]string {
|
||||
|
@ -264,6 +273,67 @@ func (m *MultiHasher) Size() int64 {
|
|||
return m.size
|
||||
}
|
||||
|
||||
// GetHashState returns the partial hash state for the given hash type encoded as a string
|
||||
func (m *MultiHasher) GetHashState(hashType Type) (string, error) {
|
||||
h, ok := m.h[hashType]
|
||||
if !ok {
|
||||
return "", ErrUnsupported
|
||||
}
|
||||
marshaler, ok := h.(encoding.BinaryMarshaler)
|
||||
if !ok {
|
||||
return "", errors.New(hashType.String() + " does not implement encoding.BinaryMarshaler")
|
||||
}
|
||||
data, err := marshaler.MarshalBinary()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(data), nil
|
||||
}
|
||||
|
||||
// RestoreHashState restores the partial hash state for the passed hash type
|
||||
func (m *MultiHasher) RestoreHashState(hashType Type, hashState string) error {
|
||||
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
unmarshaler, ok := m.h[hashType].(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SumPartialHash returns the hash of the partial hash state
|
||||
func SumPartialHash(hashName, hashState string) (string, error) {
|
||||
partialHashDef, ok := name2hash[hashName]
|
||||
if !ok {
|
||||
return "", ErrUnsupported
|
||||
}
|
||||
partialHash := partialHashDef.newFunc()
|
||||
partialHashState, err := base64.StdEncoding.DecodeString(hashState)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
unmarshaler, ok := partialHash.(encoding.BinaryUnmarshaler)
|
||||
if ok {
|
||||
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
return hex.EncodeToString(partialHash.Sum(nil)), nil
|
||||
}
|
||||
|
||||
// NameToType returns the requested hash type or None if the hash type isn't supported
|
||||
func NameToType(hashName string) Type {
|
||||
hashDef, ok := name2hash[hashName]
|
||||
if !ok {
|
||||
return None
|
||||
}
|
||||
return hashDef.hashType
|
||||
}
|
||||
|
||||
// A Set Indicates one or more hash types.
|
||||
type Set int
|
||||
|
||||
|
|
|
@ -3,13 +3,19 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
)
|
||||
|
||||
// OpenOption is an interface describing options for Open
|
||||
|
@ -230,6 +236,145 @@ func (o *HashesOption) Mandatory() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// OptionResume defines a Put/Upload for doing resumes
|
||||
type OptionResume struct {
|
||||
ID string // resume this ID if set
|
||||
Pos int64 // and resume from this position
|
||||
Hash string
|
||||
Src Object
|
||||
F Fs
|
||||
Remote string
|
||||
CacheCleaned bool
|
||||
CacheDir string
|
||||
}
|
||||
|
||||
// SetID will be called by backend's Put/Update function if the object's upload
|
||||
// could be resumed upon failure
|
||||
//
|
||||
// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in
|
||||
// --cache-dir so that future Copy operations can resume the upload if it fails
|
||||
func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error {
|
||||
ci := GetConfig(ctx)
|
||||
// Get the Fingerprint of the src object so that future Copy operations can ensure the
|
||||
// object hasn't changed before resuming an upload
|
||||
fingerprint := Fingerprint(ctx, o.Src, true)
|
||||
data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal data JSON: %w", err)
|
||||
}
|
||||
if len(data) < int(ci.MaxResumeCacheSize) {
|
||||
// Each remote will have its own directory for cached resume files
|
||||
dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.MkdirAll(dirPath, os.ModePerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err)
|
||||
}
|
||||
// Write resume data to disk
|
||||
cachePath := filepath.Join(dirPath, o.Remote)
|
||||
cacheFile, err := os.Create(cachePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create cache file %v: %w", cachePath, err)
|
||||
}
|
||||
defer func() {
|
||||
_ = cacheFile.Close()
|
||||
}()
|
||||
_, errWrite := cacheFile.Write(data)
|
||||
if errWrite != nil {
|
||||
return fmt.Errorf("failed to write JSON to file: %w", errWrite)
|
||||
}
|
||||
}
|
||||
if !o.CacheCleaned {
|
||||
rootCacheDir := filepath.Join(o.CacheDir, "resume")
|
||||
if err := cleanResumeCache(ctx, rootCacheDir); err != nil {
|
||||
return fmt.Errorf("failed to clean resume cache: %w", err)
|
||||
}
|
||||
}
|
||||
o.CacheCleaned = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResumeJSON is a struct for storing resume info in cache
|
||||
type ResumeJSON struct {
|
||||
Fingerprint string `json:"fprint"`
|
||||
ID string `json:"id"`
|
||||
HashName string `json:"hname"`
|
||||
HashState string `json:"hstate"`
|
||||
}
|
||||
|
||||
func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) {
|
||||
resumedata := ResumeJSON{
|
||||
Fingerprint: fprint,
|
||||
ID: id,
|
||||
HashName: hashName,
|
||||
HashState: hashState,
|
||||
}
|
||||
data, err := json.Marshal(&resumedata)
|
||||
return data, err
|
||||
}
|
||||
|
||||
// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit
|
||||
func cleanResumeCache(ctx context.Context, rootCacheDir string) error {
|
||||
ci := GetConfig(ctx)
|
||||
var paths []string
|
||||
pathsWithInfo := make(map[string]os.FileInfo)
|
||||
totalCacheSize := int64(0)
|
||||
walkErr := filepath.Walk(rootCacheDir,
|
||||
func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
// Empty subdirectories in the resume cache dir can be removed
|
||||
removeErr := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(removeErr) {
|
||||
return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
paths = append(paths, path)
|
||||
pathsWithInfo[path] = info
|
||||
totalCacheSize += info.Size()
|
||||
return nil
|
||||
})
|
||||
if walkErr != nil {
|
||||
return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr)
|
||||
}
|
||||
if totalCacheSize > int64(ci.MaxResumeCacheSize) {
|
||||
sort.Slice(paths, func(i, j int) bool {
|
||||
return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime())
|
||||
})
|
||||
for _, p := range paths {
|
||||
if totalCacheSize < int64(ci.MaxResumeCacheSize) {
|
||||
break
|
||||
}
|
||||
if err := os.Remove(p); err != nil {
|
||||
return fmt.Errorf("error removing oldest cache file: %s: %w", p, err)
|
||||
}
|
||||
totalCacheSize -= pathsWithInfo[p].Size()
|
||||
Debugf(p, "Successfully removed oldest cache file")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Header formats the option as an http header
|
||||
func (o *OptionResume) Header() (key string, value string) {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// String formats the option into human readable form
|
||||
func (o *OptionResume) String() string {
|
||||
return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos)
|
||||
}
|
||||
|
||||
// Mandatory returns whether the option must be parsed or can be ignored
|
||||
func (o *OptionResume) Mandatory() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// NullOption defines an Option which does nothing
|
||||
type NullOption struct {
|
||||
}
|
||||
|
|
23
fs/operations/interrupt_test.go
Normal file
23
fs/operations/interrupt_test.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package operations
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
func sendInterrupt() error {
|
||||
p, err := os.FindProcess(syscall.Getpid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.Signal(os.Interrupt)
|
||||
return err
|
||||
}
|
||||
|
||||
func setupCmd(cmd *exec.Cmd) {
|
||||
// Only needed for windows
|
||||
}
|
32
fs/operations/interrupt_win_test.go
Normal file
32
fs/operations/interrupt_win_test.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package operations
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// Credit: https://github.com/golang/go/blob/6125d0c4265067cdb67af1340bf689975dd128f4/src/os/signal/signal_windows_test.go#L18
|
||||
func sendInterrupt() error {
|
||||
d, e := syscall.LoadDLL("kernel32.dll")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
p, e := d.FindProc("GenerateConsoleCtrlEvent")
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
r, _, e := p.Call(syscall.CTRL_BREAK_EVENT, uintptr(syscall.Getpid()))
|
||||
if r == 0 {
|
||||
return e
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func setupCmd(cmd *exec.Cmd) {
|
||||
(*cmd).SysProcAttr = &syscall.SysProcAttr{
|
||||
CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP,
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/rclone/rclone/fs/walk"
|
||||
"github.com/rclone/rclone/lib/atexit"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/readers"
|
||||
|
@ -364,6 +365,11 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio
|
|||
// be nil.
|
||||
func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
var resumeOpt *fs.OptionResume
|
||||
if f.Features().Resume != nil {
|
||||
resumeOpt = createResumeOpt(ctx, f, remote, src)
|
||||
}
|
||||
|
||||
tr := accounting.Stats(ctx).NewTransfer(src)
|
||||
defer func() {
|
||||
tr.Done(ctx, err)
|
||||
|
@ -461,6 +467,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||
wrappedSrc = NewOverrideRemote(src, remote)
|
||||
}
|
||||
options := []fs.OpenOption{hashOption}
|
||||
// Appends OptionResume if it was set
|
||||
if resumeOpt != nil {
|
||||
options = append(options, resumeOpt)
|
||||
}
|
||||
for _, option := range ci.UploadHeaders {
|
||||
options = append(options, option)
|
||||
}
|
||||
|
@ -475,6 +485,17 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
|
|||
if err == nil {
|
||||
newDst = dst
|
||||
err = closeErr
|
||||
cacheParent := config.GetCacheDir()
|
||||
// Remove resume cache file (if one was created) when Put/Upload is successful
|
||||
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cacheFile := filepath.Join(cacheDir, remote)
|
||||
removeErr := os.Remove(cacheFile)
|
||||
if err != nil && !os.IsNotExist(removeErr) {
|
||||
return nil, fmt.Errorf("failed to remove resume cache file after upload: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
73
fs/operations/resume.go
Normal file
73
fs/operations/resume.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package operations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
)
|
||||
|
||||
// Creates an OptionResume that will be passed to Put/Upload
|
||||
func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) (resumeOpt *fs.OptionResume) {
|
||||
ci := fs.GetConfig(ctx)
|
||||
cacheParent := config.GetCacheDir()
|
||||
resumeOpt = &fs.OptionResume{ID: "", Pos: 0, Src: src, F: f, Remote: remote, CacheCleaned: false, CacheDir: cacheParent}
|
||||
if ci.ResumeCutoff >= 0 {
|
||||
cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume")
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
cacheFile := filepath.Join(cacheDir, remote)
|
||||
resumeID, hashName, hashState, attemptResume := readResumeCache(ctx, f, src, cacheFile)
|
||||
if attemptResume {
|
||||
fs.Debugf(f, "Existing resume cache file found: %s. A resume will now be attempted.", cacheFile)
|
||||
position, resumeErr := f.Features().Resume(ctx, remote, resumeID, hashName, hashState)
|
||||
if resumeErr != nil {
|
||||
fs.Errorf(src, "Resume canceled: %v", resumeErr)
|
||||
} else if position > int64(ci.ResumeCutoff) {
|
||||
resumeOpt.Pos = position
|
||||
resumeOpt.Hash = hashName
|
||||
}
|
||||
}
|
||||
}
|
||||
return resumeOpt
|
||||
}
|
||||
|
||||
// readResumeCache checks to see if a resume ID has been cached for the source object.
|
||||
// If it finds one it returns it along with true to signal a resume can be attempted
|
||||
func readResumeCache(ctx context.Context, f fs.Fs, src fs.Object, cacheName string) (resumeID, hashName, hashState string, attemptResume bool) {
|
||||
existingCacheFile, statErr := os.Open(cacheName)
|
||||
defer func() {
|
||||
_ = existingCacheFile.Close()
|
||||
}()
|
||||
if !os.IsNotExist(statErr) {
|
||||
rawData, readErr := ioutil.ReadAll(existingCacheFile)
|
||||
if readErr == nil {
|
||||
existingFingerprint, resumeID, hashName, hashState, unmarshalErr := unmarshalResumeJSON(ctx, rawData)
|
||||
if unmarshalErr != nil {
|
||||
fs.Debugf(f, "Failed to unmarshal Resume JSON: %s. Resume will not be attempted.", unmarshalErr.Error())
|
||||
} else if existingFingerprint != "" {
|
||||
// Check if the src object has changed by comparing new Fingerprint to Fingerprint in cache file
|
||||
fingerprint := fs.Fingerprint(ctx, src, true)
|
||||
if existingFingerprint == fingerprint {
|
||||
return resumeID, hashName, hashState, true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", "", "", false
|
||||
}
|
||||
|
||||
func unmarshalResumeJSON(ctx context.Context, data []byte) (fprint, id, hashName, hashState string, err error) {
|
||||
var resumedata fs.ResumeJSON
|
||||
err = json.Unmarshal(data, &resumedata)
|
||||
if err != nil {
|
||||
return "", "", "", "", err
|
||||
}
|
||||
return resumedata.Fingerprint, resumedata.ID, resumedata.HashName, resumedata.HashState, nil
|
||||
}
|
163
fs/operations/resume_test.go
Normal file
163
fs/operations/resume_test.go
Normal file
|
@ -0,0 +1,163 @@
|
|||
package operations
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/mockobject"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type interruptReader struct {
|
||||
once sync.Once
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
// Read sends an OS specific interrupt signal and then reads 1 byte at a time
|
||||
func (r *interruptReader) Read(b []byte) (n int, err error) {
|
||||
r.once.Do(func() {
|
||||
_ = sendInterrupt()
|
||||
})
|
||||
buffer := make([]byte, 1)
|
||||
n, err = r.r.Read(buffer)
|
||||
b[0] = buffer[0]
|
||||
// Simulate duration of a larger read without needing to test with a large file
|
||||
// Allows for the interrupt to be handled before Copy completes
|
||||
time.Sleep(time.Microsecond * 10)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// this is a wrapper for a mockobject with a custom Open function
|
||||
//
|
||||
// n indicates the number of bytes to read before sending an
|
||||
// interrupt signal
|
||||
type resumeTestObject struct {
|
||||
fs.Object
|
||||
n int64
|
||||
}
|
||||
|
||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||
//
|
||||
// The Reader will signal an interrupt after reading n bytes, then continue to read 1 byte at a time.
|
||||
// If TestResume is successful, the interrupt will be processed and reads will be cancelled before running
|
||||
// out of bytes to read
|
||||
func (o *resumeTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
rc, err := o.Object.Open(ctx, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r := io.MultiReader(&io.LimitedReader{R: rc, N: o.n}, &interruptReader{r: rc})
|
||||
// Wrap with Close in a new readCloser
|
||||
rc = readCloser{Reader: r, Closer: rc}
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func makeContent(t *testing.T, size int) []byte {
|
||||
content := make([]byte, size)
|
||||
r := rand.New(rand.NewSource(42))
|
||||
_, err := io.ReadFull(r, content)
|
||||
assert.NoError(t, err)
|
||||
return content
|
||||
}
|
||||
|
||||
func TestResume(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
r := fstest.NewRun(t)
|
||||
defer r.Finalise()
|
||||
ci := fs.GetConfig(ctx)
|
||||
ci.ResumeCutoff = 0
|
||||
|
||||
// Contents for the mock object
|
||||
var (
|
||||
// Test contents must be large enough that io.Copy does not complete during the first Rclone Copy operation
|
||||
resumeTestContents = makeContent(t, 1024)
|
||||
expectedContents = resumeTestContents
|
||||
)
|
||||
|
||||
// Create mockobjects with given breaks
|
||||
createTestSrc := func(interrupt int64) (fs.Object, fs.Object) {
|
||||
srcOrig := mockobject.New("potato").WithContent(resumeTestContents, mockobject.SeekModeNone)
|
||||
srcOrig.SetFs(r.Flocal)
|
||||
src := &resumeTestObject{
|
||||
Object: srcOrig,
|
||||
n: interrupt,
|
||||
}
|
||||
return src, srcOrig
|
||||
}
|
||||
|
||||
checkContents := func(obj fs.Object, contents string) {
|
||||
assert.NotNil(t, obj)
|
||||
assert.Equal(t, int64(len(contents)), obj.Size())
|
||||
|
||||
r, err := obj.Open(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, r)
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
data, err := ioutil.ReadAll(r)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, contents, string(data))
|
||||
_ = r.Close()
|
||||
}
|
||||
|
||||
srcBreak, srcNoBreak := createTestSrc(2)
|
||||
|
||||
// Run first Copy only in a subprocess so that it can be interrupted without ending the test
|
||||
// adapted from: https://stackoverflow.com/questions/26225513/how-to-test-os-exit-scenarios-in-go
|
||||
if os.Getenv("RUNTEST") == "1" {
|
||||
remoteRoot := os.Getenv("REMOTEROOT")
|
||||
remoteFs, err := fs.NewFs(ctx, remoteRoot)
|
||||
require.NoError(t, err)
|
||||
_, _ = Copy(ctx, remoteFs, nil, "testdst", srcBreak)
|
||||
// This should never be reached as the subroutine should exit during Copy
|
||||
require.True(t, false, "Problem with test, first Copy operation should've been interrupted before completion")
|
||||
return
|
||||
}
|
||||
// Start the subprocess
|
||||
cmd := exec.Command(os.Args[0], "-test.run=TestResume")
|
||||
cmd.Env = append(os.Environ(), "RUNTEST=1", "REMOTEROOT="+r.Fremote.Root())
|
||||
cmd.Stdout = os.Stdout
|
||||
setupCmd(cmd)
|
||||
err := cmd.Run()
|
||||
|
||||
e, ok := err.(*exec.ExitError)
|
||||
|
||||
// Exit code after signal will be (128+signum) on Linux or (signum) on Windows
|
||||
expectedErrorString := "exit status 1"
|
||||
if runtime.GOOS == "windows" {
|
||||
expectedErrorString = "exit status 2"
|
||||
}
|
||||
assert.True(t, ok)
|
||||
assert.Contains(t, e.Error(), expectedErrorString)
|
||||
|
||||
var buf bytes.Buffer
|
||||
log.SetOutput(&buf)
|
||||
defer func() {
|
||||
log.SetOutput(os.Stderr)
|
||||
}()
|
||||
|
||||
// Start copy again, but with no breaks
|
||||
newDst, err := Copy(ctx, r.Fremote, nil, "testdst", srcNoBreak)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Checks to see if a resume was initiated
|
||||
// Resumed byte position can vary slightly depending how long it takes atexit to process the interrupt
|
||||
assert.True(t, strings.Contains(buf.String(), "Resuming at byte position: "), "The upload did not resume when restarted. Message: %q", buf.String())
|
||||
|
||||
checkContents(newDst, string(expectedContents))
|
||||
}
|
1
go.mod
1
go.mod
|
@ -44,6 +44,7 @@ require (
|
|||
github.com/ncw/swift/v2 v2.0.1
|
||||
github.com/nsf/termbox-go v1.1.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/pkg/sftp v1.13.4
|
||||
github.com/pmezard/go-difflib v1.0.0
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
|
|
50
lib/cacheroot/cacheroot.go
Normal file
50
lib/cacheroot/cacheroot.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package cacheroot
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
)
|
||||
|
||||
// CreateCacheRoot will derive and make a subsystem cache path.
|
||||
//
|
||||
// Returned root OS path is an absolute path with UNC prefix,
|
||||
// OS-specific path separators, and encoded with OS-specific encoder.
|
||||
//
|
||||
// Additionally it is returned as a standard path without UNC prefix,
|
||||
// with slash path separators, and standard (internal) encoding.
|
||||
//
|
||||
// Care is taken when creating OS paths so that the ':' separator
|
||||
// following a drive letter is not encoded, e.g. into unicode fullwidth colon.
|
||||
//
|
||||
// parentOSPath should contain an absolute local path in OS encoding.
|
||||
//
|
||||
// Note: instead of fs.Fs it takes name and root as plain strings
|
||||
// to prevent import loops due to dependency on the fs package.
|
||||
func CreateCacheRoot(parentOSPath, fsName, fsRoot, cacheName string) (rootOSPath, standardPath string, err error) {
|
||||
// Get a relative cache path representing the remote.
|
||||
relativeDir := fsRoot
|
||||
if runtime.GOOS == "windows" && strings.HasPrefix(relativeDir, `//?/`) {
|
||||
// Trim off the leading "//" to make the result
|
||||
// valid for appending to another path.
|
||||
relativeDir = relativeDir[2:]
|
||||
}
|
||||
relativeDir = fsName + "/" + relativeDir
|
||||
|
||||
// Derive and make the cache root directory
|
||||
relativeOSPath := filepath.FromSlash(encoder.OS.FromStandardPath(relativeDir))
|
||||
rootOSPath = file.UNCPath(filepath.Join(parentOSPath, cacheName, relativeOSPath))
|
||||
if err = os.MkdirAll(rootOSPath, 0700); err != nil {
|
||||
return "", "", errors.Wrapf(err, "failed to create %s cache directory", cacheName)
|
||||
}
|
||||
|
||||
parentStdPath := encoder.OS.ToStandardPath(filepath.ToSlash(parentOSPath))
|
||||
standardPath = fmt.Sprintf("%s/%s/%s", parentStdPath, cacheName, relativeDir)
|
||||
return rootOSPath, standardPath, nil
|
||||
}
|
|
@ -9,7 +9,6 @@ import (
|
|||
"sync/atomic"
|
||||
"syscall"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
|
@ -48,7 +47,6 @@ func PreAllocate(size int64, out *os.File) (err error) {
|
|||
// Try the next flags combination
|
||||
index++
|
||||
atomic.StoreInt32(&fallocFlagsIndex, index)
|
||||
fs.Debugf(nil, "preAllocate: got error on fallocate, trying combination %d/%d: %v", index, len(fallocFlags), err)
|
||||
goto again
|
||||
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -21,6 +20,7 @@ import (
|
|||
"github.com/rclone/rclone/fs/fserrors"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
"github.com/rclone/rclone/lib/cacheroot"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/file"
|
||||
"github.com/rclone/rclone/vfs/vfscache/writeback"
|
||||
|
@ -75,43 +75,30 @@ type AddVirtualFn func(remote string, size int64, isDir bool) error
|
|||
// This starts background goroutines which can be cancelled with the
|
||||
// context passed in.
|
||||
func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVirtualFn) (*Cache, error) {
|
||||
// Get cache root path.
|
||||
// We need it in two variants: OS path as an absolute path with UNC prefix,
|
||||
// OS-specific path separators, and encoded with OS-specific encoder. Standard path
|
||||
// without UNC prefix, with slash path separators, and standard (internal) encoding.
|
||||
// Care must be taken when creating OS paths so that the ':' separator following a
|
||||
// drive letter is not encoded (e.g. into unicode fullwidth colon).
|
||||
var err error
|
||||
parentOSPath := config.GetCacheDir() // Assuming string contains a local absolute path in OS encoding
|
||||
fs.Debugf(nil, "vfs cache: root is %q", parentOSPath)
|
||||
parentPath := fromOSPath(parentOSPath)
|
||||
|
||||
// Get a relative cache path representing the remote.
|
||||
relativeDirPath := fremote.Root() // This is a remote path in standard encoding
|
||||
if runtime.GOOS == "windows" {
|
||||
if strings.HasPrefix(relativeDirPath, `//?/`) {
|
||||
relativeDirPath = relativeDirPath[2:] // Trim off the "//" for the result to be a valid when appending to another path
|
||||
}
|
||||
}
|
||||
relativeDirPath = fremote.Name() + "/" + relativeDirPath
|
||||
relativeDirOSPath := toOSPath(relativeDirPath)
|
||||
|
||||
// Create cache root dirs
|
||||
var dataOSPath, metaOSPath string
|
||||
if dataOSPath, metaOSPath, err = createRootDirs(parentOSPath, relativeDirOSPath); err != nil {
|
||||
fsName, fsRoot := fremote.Name(), fremote.Root()
|
||||
dataOSPath, dataStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfs")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fdata, err := fscache.Get(ctx, dataStdPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get data cache backend: %w", err)
|
||||
}
|
||||
fs.Debugf(nil, "vfs cache: data root is %q", dataOSPath)
|
||||
fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath)
|
||||
|
||||
// Get (create) cache backends
|
||||
var fdata, fmeta fs.Fs
|
||||
if fdata, fmeta, err = getBackends(ctx, parentPath, relativeDirPath); err != nil {
|
||||
metaOSPath, metaStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfsMeta")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hashType, hashOption := operations.CommonHash(ctx, fdata, fremote)
|
||||
fmeta, err := fscache.Get(ctx, metaStdPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get metadata cache backend: %w", err)
|
||||
}
|
||||
fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath)
|
||||
|
||||
// Create the cache object
|
||||
hashType, hashOption := operations.CommonHash(ctx, fdata, fremote)
|
||||
c := &Cache{
|
||||
fremote: fremote,
|
||||
fcache: fdata,
|
||||
|
@ -150,23 +137,6 @@ func createDir(dir string) error {
|
|||
return file.MkdirAll(dir, 0700)
|
||||
}
|
||||
|
||||
// createRootDir creates a single cache root directory
|
||||
func createRootDir(parentOSPath string, name string, relativeDirOSPath string) (path string, err error) {
|
||||
path = file.UNCPath(filepath.Join(parentOSPath, name, relativeDirOSPath))
|
||||
err = createDir(path)
|
||||
return
|
||||
}
|
||||
|
||||
// createRootDirs creates all cache root directories
|
||||
func createRootDirs(parentOSPath string, relativeDirOSPath string) (dataOSPath string, metaOSPath string, err error) {
|
||||
if dataOSPath, err = createRootDir(parentOSPath, "vfs", relativeDirOSPath); err != nil {
|
||||
err = fmt.Errorf("failed to create data cache directory: %w", err)
|
||||
} else if metaOSPath, err = createRootDir(parentOSPath, "vfsMeta", relativeDirOSPath); err != nil {
|
||||
err = fmt.Errorf("failed to create metadata cache directory: %w", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// createItemDir creates the directory for named item in all cache roots
|
||||
//
|
||||
// Returns an os path for the data cache file.
|
||||
|
@ -186,22 +156,6 @@ func (c *Cache) createItemDir(name string) (string, error) {
|
|||
return filepath.Join(parentPath, leaf), nil
|
||||
}
|
||||
|
||||
// getBackend gets a backend for a cache root dir
|
||||
func getBackend(ctx context.Context, parentPath string, name string, relativeDirPath string) (fs.Fs, error) {
|
||||
path := fmt.Sprintf("%s/%s/%s", parentPath, name, relativeDirPath)
|
||||
return fscache.Get(ctx, path)
|
||||
}
|
||||
|
||||
// getBackends gets backends for all cache root dirs
|
||||
func getBackends(ctx context.Context, parentPath string, relativeDirPath string) (fdata fs.Fs, fmeta fs.Fs, err error) {
|
||||
if fdata, err = getBackend(ctx, parentPath, "vfs", relativeDirPath); err != nil {
|
||||
err = fmt.Errorf("failed to get data cache backend: %w", err)
|
||||
} else if fmeta, err = getBackend(ctx, parentPath, "vfsMeta", relativeDirPath); err != nil {
|
||||
err = fmt.Errorf("failed to get metadata cache backend: %w", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// clean returns the cleaned version of name for use in the index map
|
||||
//
|
||||
// name should be a remote path not an osPath
|
||||
|
@ -214,11 +168,6 @@ func clean(name string) string {
|
|||
return name
|
||||
}
|
||||
|
||||
// fromOSPath turns a OS path into a standard/remote path
|
||||
func fromOSPath(osPath string) string {
|
||||
return encoder.OS.ToStandardPath(filepath.ToSlash(osPath))
|
||||
}
|
||||
|
||||
// toOSPath turns a standard/remote path into an OS path
|
||||
func toOSPath(standardPath string) string {
|
||||
return filepath.FromSlash(encoder.OS.FromStandardPath(standardPath))
|
||||
|
|
Loading…
Add table
Reference in a new issue