diff --git a/backend/cache/cache_test.go b/backend/cache/cache_test.go index 44307ea6d..d414e67ae 100644 --- a/backend/cache/cache_test.go +++ b/backend/cache/cache_test.go @@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestCache:", NilObject: (*cache.Object)(nil), - UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt"}, + UnimplementableFsMethods: []string{"PublicLink", "OpenWriterAt", "Resume"}, UnimplementableObjectMethods: []string{"MimeType", "ID", "GetTier", "SetTier"}, SkipInvalidUTF8: true, // invalid UTF-8 confuses the cache }) diff --git a/backend/chunker/chunker_test.go b/backend/chunker/chunker_test.go index 4acdf5b5a..376a9c5ac 100644 --- a/backend/chunker/chunker_test.go +++ b/backend/chunker/chunker_test.go @@ -43,6 +43,7 @@ func TestIntegration(t *testing.T) { "DirCacheFlush", "UserInfo", "Disconnect", + "Resume", }, } if *fstest.RemoteName == "" { diff --git a/backend/crypt/crypt_test.go b/backend/crypt/crypt_test.go index 8369a60f7..b4f803b7b 100644 --- a/backend/crypt/crypt_test.go +++ b/backend/crypt/crypt_test.go @@ -23,7 +23,7 @@ func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: *fstest.RemoteName, NilObject: (*crypt.Object)(nil), - UnimplementableFsMethods: []string{"OpenWriterAt"}, + UnimplementableFsMethods: []string{"OpenWriterAt", "Resume"}, UnimplementableObjectMethods: []string{"MimeType"}, }) } diff --git a/backend/local/local.go b/backend/local/local.go index 5dc65ec6a..314fcdea0 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -12,6 +12,7 @@ import ( "path" "path/filepath" "runtime" + "strconv" "strings" "sync" "time" @@ -24,6 +25,7 @@ import ( "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/lib/readers" @@ -230,6 +232,7 @@ type Fs struct { precision time.Duration // precision of local filesystem warnedMu sync.Mutex // used for locking access to 'warned'. warned map[string]struct{} // whether we have warned about this string + hashState map[string]string // set in resume(), used to restore hash state // do os.Lstat or os.Stat lstat func(name string) (os.FileInfo, error) @@ -267,11 +270,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e } f := &Fs{ - name: name, - opt: *opt, - warned: make(map[string]struct{}), - dev: devUnset, - lstat: os.Lstat, + name: name, + opt: *opt, + warned: make(map[string]struct{}), + hashState: make(map[string]string), + dev: devUnset, + lstat: os.Lstat, } f.root = cleanRootPath(root, f.opt.NoUNC, f.opt.Enc) f.features = (&fs.Features{ @@ -1115,6 +1119,7 @@ func (nwc nopWriterCloser) Close() error { func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { var out io.WriteCloser var hasher *hash.MultiHasher + var resumeOpt *fs.OptionResume for _, option := range options { switch x := option.(type) { @@ -1125,6 +1130,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } } + case *fs.OptionResume: + resumeOpt = option.(*fs.OptionResume) + if resumeOpt.Pos != 0 { + fs.Logf(o, "Resuming at byte position: %d", resumeOpt.Pos) + // Discard bytes that already exist on backend + _, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos) + if err != nil { + return err + } + hashType := o.fs.Hashes().GetOne() + hasher, err = hash.NewMultiHasherTypes(hash.NewHashSet(hashType)) + if err != nil { + return err + } + if err := hasher.RestoreHashState(hashType, o.fs.hashState[o.remote]); err != nil { + return err + } + } } } @@ -1138,7 +1161,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // If it is a translated link, just read in the contents, and // then create a symlink if !o.translatedLink { - f, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + var f *os.File + if resumeOpt != nil && resumeOpt.Pos != 0 { + f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_APPEND, 0666) + } else { + f, err = file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) + } if err != nil { if runtime.GOOS == "windows" && os.IsPermission(err) { // If permission denied on Windows might be trying to update a @@ -1152,7 +1180,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } } - if !o.fs.opt.NoPreAllocate { + if !o.fs.opt.NoPreAllocate && resumeOpt != nil && resumeOpt.Pos == 0 { // Pre-allocate the file for performance reasons err = file.PreAllocate(src.Size(), f) if err != nil { @@ -1173,7 +1201,40 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op in = io.TeeReader(in, hasher) } - _, err = io.Copy(out, in) + var cacheingWg sync.WaitGroup // Used to halt code execution while resume cache is written + var copyWg sync.WaitGroup // Ensure that io.Copy has returned before writing resume data + copyWg.Add(1) + // Context for read so that we can handle io.copy being interrupted + ctxr, cancel := context.WithCancel(ctx) + // Create exit handler during Copy so that resume data can be written if interrupted + atexitHandle := atexit.Register(func() { + // If OptionResume was passed, call SetID to prepare for future resumes + // ID is the number of bytes written to the destination + if resumeOpt != nil { + // Stops the copy so cache is consistent with remote + cacheingWg.Add(1) + cancel() + copyWg.Wait() + fs.Infof(o, "Updating resume cache") + fileInfo, _ := o.fs.lstat(o.path) + writtenStr := strconv.FormatInt(fileInfo.Size(), 10) + hashType := o.fs.Hashes().GetOne() + hashState, err := hasher.GetHashState(hashType) + if err != nil { + return + } + _ = resumeOpt.SetID(ctx, writtenStr, hashType.String(), hashState) + } + }) + cr := readers.NewContextReader(ctxr, in) + _, err = io.Copy(out, cr) + copyWg.Done() + atexit.Unregister(atexitHandle) + if errors.Is(err, context.Canceled) { + // If resume data is being written we want to wait here for the program to exit + cacheingWg.Wait() + } + closeErr := out.Close() if err == nil { err = closeErr @@ -1338,9 +1399,44 @@ func cleanRootPath(s string, noUNC bool, enc encoder.MultiEncoder) string { return s } +// Resume checks whether the (remote, ID) pair is valid and returns +// the point the file should be resumed from or an error. +func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) { + cachedPos, err := strconv.ParseInt(ID, 10, 64) + if err != nil { + return 0, err + } + // Compare hash of partial file on remote with partial hash in cache + remoteObject, err := f.NewObject(ctx, remote) + if err != nil { + return 0, err + } + if remoteObject.Size() != cachedPos { + return 0, errors.New("size on remote does not match resume cache") + } + hashType := hash.NameToType(hashName) + remoteHash, err := remoteObject.Hash(ctx, hashType) + if err != nil { + return 0, err + } + cachedHash, err := hash.SumPartialHash(hashName, hashState) + if err != nil { + return 0, err + } + // Hashes match, attempt resume + if cachedHash == remoteHash { + f.hashState[remote] = hashState + return cachedPos, nil + } + // No valid position found, restart from beginning + fs.Infof(remote, "Not resuming as cached hash state did not match hash state on remote") + return 0, nil +} + // Check the interfaces are satisfied var ( _ fs.Fs = &Fs{} + _ fs.Resumer = &Fs{} _ fs.Purger = &Fs{} _ fs.PutStreamer = &Fs{} _ fs.Mover = &Fs{} diff --git a/backend/union/union_test.go b/backend/union/union_test.go index c58936e9d..a6176440a 100644 --- a/backend/union/union_test.go +++ b/backend/union/union_test.go @@ -18,7 +18,7 @@ func TestIntegration(t *testing.T) { } fstests.Run(t, &fstests.Opt{ RemoteName: *fstest.RemoteName, - UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles", "Resume"}, UnimplementableObjectMethods: []string{"MimeType"}, }) } diff --git a/fs/config.go b/fs/config.go index 480e0a39b..4ea402747 100644 --- a/fs/config.go +++ b/fs/config.go @@ -130,6 +130,8 @@ type ConfigInfo struct { FsCacheExpireDuration time.Duration FsCacheExpireInterval time.Duration DisableHTTP2 bool + MaxResumeCacheSize SizeSuffix + ResumeCutoff SizeSuffix HumanReadable bool KvLockTime time.Duration // maximum time to keep key-value database locked by process } @@ -163,6 +165,8 @@ func NewConfig() *ConfigInfo { c.TPSLimitBurst = 1 c.MaxTransfer = -1 c.MaxBacklog = 10000 + c.MaxResumeCacheSize = SizeSuffix(100 * 1024) + c.ResumeCutoff = -1 // We do not want to set the default here. We use this variable being empty as part of the fall-through of options. // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024) diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index 7c2c8ade9..65a678d77 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -132,6 +132,8 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) { flags.BoolVarP(flagSet, &ci.RefreshTimes, "refresh-times", "", ci.RefreshTimes, "Refresh the modtime of remote files") flags.BoolVarP(flagSet, &ci.NoConsole, "no-console", "", ci.NoConsole, "Hide console window (supported on Windows only)") flags.StringVarP(flagSet, &dscp, "dscp", "", "", "Set DSCP value to connections, value or name, e.g. CS1, LE, DF, AF21") + flags.FVarP(flagSet, &ci.MaxResumeCacheSize, "max-resume-cache-size", "", "The maximum size of the cache used to store data necessary for resuming uploads. When the storage grows beyond this size, the oldest resume data will be deleted. (default 100k") + flags.FVarP(flagSet, &ci.ResumeCutoff, "resume-cutoff", "", "If set, attempt to resume all partial uploads larger than this size. (default off)") flags.DurationVarP(flagSet, &ci.FsCacheExpireDuration, "fs-cache-expire-duration", "", ci.FsCacheExpireDuration, "Cache remotes for this long (0 to disable caching)") flags.DurationVarP(flagSet, &ci.FsCacheExpireInterval, "fs-cache-expire-interval", "", ci.FsCacheExpireInterval, "Interval to check for expired remotes") flags.BoolVarP(flagSet, &ci.DisableHTTP2, "disable-http2", "", ci.DisableHTTP2, "Disable HTTP/2 in the global transport") diff --git a/fs/features.go b/fs/features.go index 6db6bd548..456e66dba 100644 --- a/fs/features.go +++ b/fs/features.go @@ -163,6 +163,10 @@ type Features struct { // Shutdown the backend, closing any background tasks and any // cached connections. Shutdown func(ctx context.Context) error + + // Resume checks whether the (remote, ID) pair is valid and returns + // the point the file should be resumed from or an error. + Resume func(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) } // Disable nil's out the named feature. If it isn't found then it @@ -290,6 +294,9 @@ func (ft *Features) Fill(ctx context.Context, f Fs) *Features { if do, ok := f.(Shutdowner); ok { ft.Shutdown = do.Shutdown } + if do, ok := f.(Resumer); ok { + ft.Resume = do.Resume + } return ft.DisableList(GetConfig(ctx).DisableFeatures) } @@ -636,6 +643,13 @@ type Shutdowner interface { Shutdown(ctx context.Context) error } +// Resumer is an optional interface for Fs +type Resumer interface { + // Resume checks whether the (remote, ID) pair is valid and returns + // the point the file should be resumed from or an error. + Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) +} + // ObjectsChan is a channel of Objects type ObjectsChan chan Object diff --git a/fs/fs.go b/fs/fs.go index 65d9c17d4..9f2e3f202 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -48,6 +48,7 @@ var ( ErrorNotImplemented = errors.New("optional feature not implemented") ErrorCommandNotFound = errors.New("command not found") ErrorFileNameTooLong = errors.New("file name too long") + ErrorCantResume = errors.New("can't resume file upload") ) // CheckClose is a utility function used to check the return from diff --git a/fs/hash/hash.go b/fs/hash/hash.go index 48ecdcd4c..e87ac1929 100644 --- a/fs/hash/hash.go +++ b/fs/hash/hash.go @@ -4,6 +4,7 @@ import ( "crypto/md5" "crypto/sha1" "crypto/sha256" + "encoding" "encoding/base64" "encoding/hex" "errors" @@ -264,6 +265,67 @@ func (m *MultiHasher) Size() int64 { return m.size } +// GetHashState returns the partial hash state for the given hash type encoded as a string +func (m *MultiHasher) GetHashState(hashType Type) (string, error) { + h, ok := m.h[hashType] + if !ok { + return "", ErrUnsupported + } + marshaler, ok := h.(encoding.BinaryMarshaler) + if !ok { + return "", errors.New(hashType.String() + " does not implement encoding.BinaryMarshaler") + } + data, err := marshaler.MarshalBinary() + if err != nil { + return "", err + } + return base64.StdEncoding.EncodeToString(data), nil +} + +// RestoreHashState restores the partial hash state for the passed hash type +func (m *MultiHasher) RestoreHashState(hashType Type, hashState string) error { + partialHashState, err := base64.StdEncoding.DecodeString(hashState) + if err != nil { + return err + } + unmarshaler, ok := m.h[hashType].(encoding.BinaryUnmarshaler) + if ok { + if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil { + return err + } + } + return nil +} + +// SumPartialHash returns the hash of the partial hash state +func SumPartialHash(hashName, hashState string) (string, error) { + partialHashDef, ok := name2hash[hashName] + if !ok { + return "", ErrUnsupported + } + partialHash := partialHashDef.newFunc() + partialHashState, err := base64.StdEncoding.DecodeString(hashState) + if err != nil { + return "", err + } + unmarshaler, ok := partialHash.(encoding.BinaryUnmarshaler) + if ok { + if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil { + return "", err + } + } + return hex.EncodeToString(partialHash.Sum(nil)), nil +} + +// NameToType returns the requested hash type or None if the hash type isn't supported +func NameToType(hashName string) Type { + hashDef, ok := name2hash[hashName] + if !ok { + return None + } + return hashDef.hashType +} + // A Set Indicates one or more hash types. type Set int diff --git a/fs/open_options.go b/fs/open_options.go index 8d997118a..da7762353 100644 --- a/fs/open_options.go +++ b/fs/open_options.go @@ -3,13 +3,19 @@ package fs import ( + "context" + "encoding/json" "errors" "fmt" "net/http" + "os" + "path/filepath" + "sort" "strconv" "strings" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/cacheroot" ) // OpenOption is an interface describing options for Open @@ -230,6 +236,144 @@ func (o *HashesOption) Mandatory() bool { return false } +// OptionResume defines a Put/Upload for doing resumes +type OptionResume struct { + ID string // resume this ID if set + Pos int64 // and resume from this position + Src Object + F Fs + Remote string + CacheCleaned bool + CacheDir string +} + +// SetID will be called by backend's Put/Update function if the object's upload +// could be resumed upon failure +// +// SetID takes the passed resume ID, hash state, hash name and Fingerprint of the object and stores it in +// --cache-dir so that future Copy operations can resume the upload if it fails +func (o *OptionResume) SetID(ctx context.Context, ID, hashName, hashState string) error { + ci := GetConfig(ctx) + // Get the Fingerprint of the src object so that future Copy operations can ensure the + // object hasn't changed before resuming an upload + fingerprint := Fingerprint(ctx, o.Src, true) + data, err := marshalResumeJSON(ctx, fingerprint, ID, hashName, hashState) + if err != nil { + return fmt.Errorf("failed to marshal data JSON: %w", err) + } + if len(data) < int(ci.MaxResumeCacheSize) { + // Each remote will have its own directory for cached resume files + dirPath, _, err := cacheroot.CreateCacheRoot(o.CacheDir, o.F.Name(), o.F.Root(), "resume") + if err != nil { + return err + } + err = os.MkdirAll(dirPath, os.ModePerm) + if err != nil { + return fmt.Errorf("failed to create cache directory %v: %w", dirPath, err) + } + // Write resume data to disk + cachePath := filepath.Join(dirPath, o.Remote) + cacheFile, err := os.Create(cachePath) + if err != nil { + return fmt.Errorf("failed to create cache file %v: %w", cachePath, err) + } + defer func() { + _ = cacheFile.Close() + }() + _, errWrite := cacheFile.Write(data) + if errWrite != nil { + return fmt.Errorf("failed to write JSON to file: %w", errWrite) + } + } + if !o.CacheCleaned { + rootCacheDir := filepath.Join(o.CacheDir, "resume") + if err := cleanResumeCache(ctx, rootCacheDir); err != nil { + return fmt.Errorf("failed to clean resume cache: %w", err) + } + } + o.CacheCleaned = true + return nil +} + +// ResumeJSON is a struct for storing resume info in cache +type ResumeJSON struct { + Fingerprint string `json:"fprint"` + ID string `json:"id"` + HashName string `json:"hname"` + HashState string `json:"hstate"` +} + +func marshalResumeJSON(ctx context.Context, fprint, id, hashName, hashState string) ([]byte, error) { + resumedata := ResumeJSON{ + Fingerprint: fprint, + ID: id, + HashName: hashName, + HashState: hashState, + } + data, err := json.Marshal(&resumedata) + return data, err +} + +// cleanCache checks the size of the resume cache and removes the oldest resume files if more than limit +func cleanResumeCache(ctx context.Context, rootCacheDir string) error { + ci := GetConfig(ctx) + var paths []string + pathsWithInfo := make(map[string]os.FileInfo) + totalCacheSize := int64(0) + walkErr := filepath.Walk(rootCacheDir, + func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + // Empty subdirectories in the resume cache dir can be removed + removeErr := os.Remove(path) + if err != nil && !os.IsNotExist(removeErr) { + return fmt.Errorf("failed to remove empty subdirectory: %s: %w", path, err) + } + return nil + } + paths = append(paths, path) + pathsWithInfo[path] = info + totalCacheSize += info.Size() + return nil + }) + if walkErr != nil { + return fmt.Errorf("error walking through cache when cleaning cache dir: %w", walkErr) + } + if totalCacheSize > int64(ci.MaxResumeCacheSize) { + sort.Slice(paths, func(i, j int) bool { + return pathsWithInfo[paths[i]].ModTime().Before(pathsWithInfo[paths[j]].ModTime()) + }) + for _, p := range paths { + if totalCacheSize < int64(ci.MaxResumeCacheSize) { + break + } + if err := os.Remove(p); err != nil { + return fmt.Errorf("error removing oldest cache file: %s: %w", p, err) + } + totalCacheSize -= pathsWithInfo[p].Size() + Debugf(p, "Successfully removed oldest cache file") + } + } + return nil +} + +// Header formats the option as an http header +func (o *OptionResume) Header() (key string, value string) { + return "", "" +} + +// String formats the option into human readable form +func (o *OptionResume) String() string { + return fmt.Sprintf("OptionResume(ID:%v, Pos:%v)", o.ID, o.Pos) +} + +// Mandatory returns whether the option must be parsed or can be ignored +func (o *OptionResume) Mandatory() bool { + return false +} + // NullOption defines an Option which does nothing type NullOption struct { } diff --git a/fs/operations/interrupt_test.go b/fs/operations/interrupt_test.go new file mode 100644 index 000000000..2c1e6b6bc --- /dev/null +++ b/fs/operations/interrupt_test.go @@ -0,0 +1,23 @@ +//go:build !windows +// +build !windows + +package operations + +import ( + "os" + "os/exec" + "syscall" +) + +func sendInterrupt() error { + p, err := os.FindProcess(syscall.Getpid()) + if err != nil { + return err + } + err = p.Signal(os.Interrupt) + return err +} + +func setupCmd(cmd *exec.Cmd) { + // Only needed for windows +} diff --git a/fs/operations/interrupt_win_test.go b/fs/operations/interrupt_win_test.go new file mode 100644 index 000000000..1c7e9d023 --- /dev/null +++ b/fs/operations/interrupt_win_test.go @@ -0,0 +1,32 @@ +//go:build windows +// +build windows + +package operations + +import ( + "os/exec" + "syscall" +) + +// Credit: https://github.com/golang/go/blob/6125d0c4265067cdb67af1340bf689975dd128f4/src/os/signal/signal_windows_test.go#L18 +func sendInterrupt() error { + d, e := syscall.LoadDLL("kernel32.dll") + if e != nil { + return e + } + p, e := d.FindProc("GenerateConsoleCtrlEvent") + if e != nil { + return e + } + r, _, e := p.Call(syscall.CTRL_BREAK_EVENT, uintptr(syscall.Getpid())) + if r == 0 { + return e + } + return nil +} + +func setupCmd(cmd *exec.Cmd) { + (*cmd).SysProcAttr = &syscall.SysProcAttr{ + CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, + } +} diff --git a/fs/operations/operations.go b/fs/operations/operations.go index bc5832b59..8d8bb2495 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -33,6 +33,7 @@ import ( "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/cacheroot" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/readers" @@ -364,6 +365,11 @@ func CommonHash(ctx context.Context, fa, fb fs.Info) (hash.Type, *fs.HashesOptio // be nil. func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { ci := fs.GetConfig(ctx) + var resumeOpt *fs.OptionResume + if f.Features().Resume != nil { + resumeOpt = createResumeOpt(ctx, f, remote, src) + } + tr := accounting.Stats(ctx).NewTransfer(src) defer func() { tr.Done(ctx, err) @@ -461,6 +467,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj wrappedSrc = NewOverrideRemote(src, remote) } options := []fs.OpenOption{hashOption} + // Appends OptionResume if it was set + if resumeOpt != nil { + options = append(options, resumeOpt) + } for _, option := range ci.UploadHeaders { options = append(options, option) } @@ -475,6 +485,17 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj if err == nil { newDst = dst err = closeErr + cacheParent := config.GetCacheDir() + // Remove resume cache file (if one was created) when Put/Upload is successful + cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume") + if err != nil { + return nil, err + } + cacheFile := filepath.Join(cacheDir, remote) + removeErr := os.Remove(cacheFile) + if err != nil && !os.IsNotExist(removeErr) { + return nil, fmt.Errorf("failed to remove resume cache file after upload: %w", err) + } } } } diff --git a/fs/operations/resume.go b/fs/operations/resume.go new file mode 100644 index 000000000..7c0eb7e28 --- /dev/null +++ b/fs/operations/resume.go @@ -0,0 +1,72 @@ +package operations + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/lib/cacheroot" +) + +// Creates an OptionResume that will be passed to Put/Upload +func createResumeOpt(ctx context.Context, f fs.Fs, remote string, src fs.Object) (resumeOpt *fs.OptionResume) { + ci := fs.GetConfig(ctx) + cacheParent := config.GetCacheDir() + resumeOpt = &fs.OptionResume{ID: "", Pos: 0, Src: src, F: f, Remote: remote, CacheCleaned: false, CacheDir: cacheParent} + if ci.ResumeCutoff >= 0 { + cacheDir, _, err := cacheroot.CreateCacheRoot(cacheParent, f.Name(), f.Root(), "resume") + if err != nil { + return nil + } + cacheFile := filepath.Join(cacheDir, remote) + resumeID, hashName, hashState, attemptResume := readResumeCache(ctx, f, src, cacheFile) + if attemptResume { + fs.Debugf(f, "Existing resume cache file found: %s. A resume will now be attempted.", cacheFile) + position, resumeErr := f.Features().Resume(ctx, remote, resumeID, hashName, hashState) + if resumeErr != nil { + fs.Errorf(src, "Resume canceled: %v", resumeErr) + } else if position > int64(ci.ResumeCutoff) { + resumeOpt.Pos = position + } + } + } + return resumeOpt +} + +// readResumeCache checks to see if a resume ID has been cached for the source object. +// If it finds one it returns it along with true to signal a resume can be attempted +func readResumeCache(ctx context.Context, f fs.Fs, src fs.Object, cacheName string) (resumeID, hashName, hashState string, attemptResume bool) { + existingCacheFile, statErr := os.Open(cacheName) + defer func() { + _ = existingCacheFile.Close() + }() + if !os.IsNotExist(statErr) { + rawData, readErr := ioutil.ReadAll(existingCacheFile) + if readErr == nil { + existingFingerprint, resumeID, hashName, hashState, unmarshalErr := unmarshalResumeJSON(ctx, rawData) + if unmarshalErr != nil { + fs.Debugf(f, "Failed to unmarshal Resume JSON: %s. Resume will not be attempted.", unmarshalErr.Error()) + } else if existingFingerprint != "" { + // Check if the src object has changed by comparing new Fingerprint to Fingerprint in cache file + fingerprint := fs.Fingerprint(ctx, src, true) + if existingFingerprint == fingerprint { + return resumeID, hashName, hashState, true + } + } + } + } + return "", "", "", false +} + +func unmarshalResumeJSON(ctx context.Context, data []byte) (fprint, id, hashName, hashState string, err error) { + var resumedata fs.ResumeJSON + err = json.Unmarshal(data, &resumedata) + if err != nil { + return "", "", "", "", err + } + return resumedata.Fingerprint, resumedata.ID, resumedata.HashName, resumedata.HashState, nil +} diff --git a/fs/operations/resume_test.go b/fs/operations/resume_test.go new file mode 100644 index 000000000..d70965322 --- /dev/null +++ b/fs/operations/resume_test.go @@ -0,0 +1,163 @@ +package operations + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "log" + "math/rand" + "os" + "os/exec" + "runtime" + "strings" + "sync" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type interruptReader struct { + once sync.Once + r io.Reader +} + +// Read sends an OS specific interrupt signal and then reads 1 byte at a time +func (r *interruptReader) Read(b []byte) (n int, err error) { + r.once.Do(func() { + _ = sendInterrupt() + }) + buffer := make([]byte, 1) + n, err = r.r.Read(buffer) + b[0] = buffer[0] + // Simulate duration of a larger read without needing to test with a large file + // Allows for the interrupt to be handled before Copy completes + time.Sleep(time.Microsecond * 10) + return n, err +} + +// this is a wrapper for a mockobject with a custom Open function +// +// n indicates the number of bytes to read before sending an +// interrupt signal +type resumeTestObject struct { + fs.Object + n int64 +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +// +// The Reader will signal an interrupt after reading n bytes, then continue to read 1 byte at a time. +// If TestResume is successful, the interrupt will be processed and reads will be cancelled before running +// out of bytes to read +func (o *resumeTestObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) { + rc, err := o.Object.Open(ctx, options...) + if err != nil { + return nil, err + } + r := io.MultiReader(&io.LimitedReader{R: rc, N: o.n}, &interruptReader{r: rc}) + // Wrap with Close in a new readCloser + rc = readCloser{Reader: r, Closer: rc} + return rc, nil +} + +func makeContent(t *testing.T, size int) []byte { + content := make([]byte, size) + r := rand.New(rand.NewSource(42)) + _, err := io.ReadFull(r, content) + assert.NoError(t, err) + return content +} + +func TestResume(t *testing.T) { + ctx := context.Background() + r := fstest.NewRun(t) + defer r.Finalise() + ci := fs.GetConfig(ctx) + ci.ResumeCutoff = 0 + + // Contents for the mock object + var ( + // Test contents must be large enough that io.Copy does not complete during the first Rclone Copy operation + resumeTestContents = makeContent(t, 1024) + expectedContents = resumeTestContents + ) + + // Create mockobjects with given breaks + createTestSrc := func(interrupt int64) (fs.Object, fs.Object) { + srcOrig := mockobject.New("potato").WithContent(resumeTestContents, mockobject.SeekModeNone) + srcOrig.SetFs(r.Flocal) + src := &resumeTestObject{ + Object: srcOrig, + n: interrupt, + } + return src, srcOrig + } + + checkContents := func(obj fs.Object, contents string) { + assert.NotNil(t, obj) + assert.Equal(t, int64(len(contents)), obj.Size()) + + r, err := obj.Open(ctx) + assert.NoError(t, err) + assert.NotNil(t, r) + if r == nil { + return + } + data, err := ioutil.ReadAll(r) + assert.NoError(t, err) + assert.Equal(t, contents, string(data)) + _ = r.Close() + } + + srcBreak, srcNoBreak := createTestSrc(2) + + // Run first Copy only in a subprocess so that it can be interrupted without ending the test + // adapted from: https://stackoverflow.com/questions/26225513/how-to-test-os-exit-scenarios-in-go + if os.Getenv("RUNTEST") == "1" { + remoteRoot := os.Getenv("REMOTEROOT") + remoteFs, err := fs.NewFs(ctx, remoteRoot) + require.NoError(t, err) + _, _ = Copy(ctx, remoteFs, nil, "testdst", srcBreak) + // This should never be reached as the subroutine should exit during Copy + require.True(t, false, "Problem with test, first Copy operation should've been interrupted before completion") + return + } + // Start the subprocess + cmd := exec.Command(os.Args[0], "-test.run=TestResume") + cmd.Env = append(os.Environ(), "RUNTEST=1", "REMOTEROOT="+r.Fremote.Root()) + cmd.Stdout = os.Stdout + setupCmd(cmd) + err := cmd.Run() + + e, ok := err.(*exec.ExitError) + + // Exit code after signal will be (128+signum) on Linux or (signum) on Windows + expectedErrorString := "exit status 1" + if runtime.GOOS == "windows" { + expectedErrorString = "exit status 2" + } + assert.True(t, ok) + assert.Contains(t, e.Error(), expectedErrorString) + + var buf bytes.Buffer + log.SetOutput(&buf) + defer func() { + log.SetOutput(os.Stderr) + }() + + // Start copy again, but with no breaks + newDst, err := Copy(ctx, r.Fremote, nil, "testdst", srcNoBreak) + assert.NoError(t, err) + + // Checks to see if a resume was initiated + // Resumed byte position can vary slightly depending how long it takes atexit to process the interrupt + assert.True(t, strings.Contains(buf.String(), "Resuming at byte position: "), "The upload did not resume when restarted. Message: %q", buf.String()) + + checkContents(newDst, string(expectedContents)) +} diff --git a/lib/cacheroot/cacheroot.go b/lib/cacheroot/cacheroot.go new file mode 100644 index 000000000..22dad995a --- /dev/null +++ b/lib/cacheroot/cacheroot.go @@ -0,0 +1,50 @@ +package cacheroot + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/pkg/errors" + "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/file" +) + +// CreateCacheRoot will derive and make a subsystem cache path. +// +// Returned root OS path is an absolute path with UNC prefix, +// OS-specific path separators, and encoded with OS-specific encoder. +// +// Additionally it is returned as a standard path without UNC prefix, +// with slash path separators, and standard (internal) encoding. +// +// Care is taken when creating OS paths so that the ':' separator +// following a drive letter is not encoded, e.g. into unicode fullwidth colon. +// +// parentOSPath should contain an absolute local path in OS encoding. +// +// Note: instead of fs.Fs it takes name and root as plain strings +// to prevent import loops due to dependency on the fs package. +func CreateCacheRoot(parentOSPath, fsName, fsRoot, cacheName string) (rootOSPath, standardPath string, err error) { + // Get a relative cache path representing the remote. + relativeDir := fsRoot + if runtime.GOOS == "windows" && strings.HasPrefix(relativeDir, `//?/`) { + // Trim off the leading "//" to make the result + // valid for appending to another path. + relativeDir = relativeDir[2:] + } + relativeDir = fsName + "/" + relativeDir + + // Derive and make the cache root directory + relativeOSPath := filepath.FromSlash(encoder.OS.FromStandardPath(relativeDir)) + rootOSPath = file.UNCPath(filepath.Join(parentOSPath, cacheName, relativeOSPath)) + if err = os.MkdirAll(rootOSPath, 0700); err != nil { + return "", "", errors.Wrapf(err, "failed to create %s cache directory", cacheName) + } + + parentStdPath := encoder.OS.ToStandardPath(filepath.ToSlash(parentOSPath)) + standardPath = fmt.Sprintf("%s/%s/%s", parentStdPath, cacheName, relativeDir) + return rootOSPath, standardPath, nil +} diff --git a/lib/file/preallocate_unix.go b/lib/file/preallocate_unix.go index e6f85272c..e349f5a7a 100644 --- a/lib/file/preallocate_unix.go +++ b/lib/file/preallocate_unix.go @@ -9,7 +9,6 @@ import ( "sync/atomic" "syscall" - "github.com/rclone/rclone/fs" "golang.org/x/sys/unix" ) @@ -48,7 +47,6 @@ func PreAllocate(size int64, out *os.File) (err error) { // Try the next flags combination index++ atomic.StoreInt32(&fallocFlagsIndex, index) - fs.Debugf(nil, "preAllocate: got error on fallocate, trying combination %d/%d: %v", index, len(fallocFlags), err) goto again } diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go index 88d1f2f24..931d1c473 100644 --- a/vfs/vfscache/cache.go +++ b/vfs/vfscache/cache.go @@ -8,7 +8,6 @@ import ( "os" "path" "path/filepath" - "runtime" "sort" "strings" "sync" @@ -21,6 +20,7 @@ import ( "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/cacheroot" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/file" "github.com/rclone/rclone/vfs/vfscache/writeback" @@ -75,43 +75,30 @@ type AddVirtualFn func(remote string, size int64, isDir bool) error // This starts background goroutines which can be cancelled with the // context passed in. func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVirtualFn) (*Cache, error) { - // Get cache root path. - // We need it in two variants: OS path as an absolute path with UNC prefix, - // OS-specific path separators, and encoded with OS-specific encoder. Standard path - // without UNC prefix, with slash path separators, and standard (internal) encoding. - // Care must be taken when creating OS paths so that the ':' separator following a - // drive letter is not encoded (e.g. into unicode fullwidth colon). - var err error parentOSPath := config.GetCacheDir() // Assuming string contains a local absolute path in OS encoding - fs.Debugf(nil, "vfs cache: root is %q", parentOSPath) - parentPath := fromOSPath(parentOSPath) - - // Get a relative cache path representing the remote. - relativeDirPath := fremote.Root() // This is a remote path in standard encoding - if runtime.GOOS == "windows" { - if strings.HasPrefix(relativeDirPath, `//?/`) { - relativeDirPath = relativeDirPath[2:] // Trim off the "//" for the result to be a valid when appending to another path - } - } - relativeDirPath = fremote.Name() + "/" + relativeDirPath - relativeDirOSPath := toOSPath(relativeDirPath) - - // Create cache root dirs - var dataOSPath, metaOSPath string - if dataOSPath, metaOSPath, err = createRootDirs(parentOSPath, relativeDirOSPath); err != nil { + fsName, fsRoot := fremote.Name(), fremote.Root() + dataOSPath, dataStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfs") + if err != nil { return nil, err } + fdata, err := fscache.Get(ctx, dataStdPath) + if err != nil { + return nil, fmt.Errorf("failed to get data cache backend: %w", err) + } fs.Debugf(nil, "vfs cache: data root is %q", dataOSPath) - fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath) - // Get (create) cache backends - var fdata, fmeta fs.Fs - if fdata, fmeta, err = getBackends(ctx, parentPath, relativeDirPath); err != nil { + metaOSPath, metaStdPath, err := cacheroot.CreateCacheRoot(parentOSPath, fsName, fsRoot, "vfsMeta") + if err != nil { return nil, err } - hashType, hashOption := operations.CommonHash(ctx, fdata, fremote) + fmeta, err := fscache.Get(ctx, metaStdPath) + if err != nil { + return nil, fmt.Errorf("failed to get metadata cache backend: %w", err) + } + fs.Debugf(nil, "vfs cache: metadata root is %q", metaOSPath) // Create the cache object + hashType, hashOption := operations.CommonHash(ctx, fdata, fremote) c := &Cache{ fremote: fremote, fcache: fdata, @@ -150,23 +137,6 @@ func createDir(dir string) error { return file.MkdirAll(dir, 0700) } -// createRootDir creates a single cache root directory -func createRootDir(parentOSPath string, name string, relativeDirOSPath string) (path string, err error) { - path = file.UNCPath(filepath.Join(parentOSPath, name, relativeDirOSPath)) - err = createDir(path) - return -} - -// createRootDirs creates all cache root directories -func createRootDirs(parentOSPath string, relativeDirOSPath string) (dataOSPath string, metaOSPath string, err error) { - if dataOSPath, err = createRootDir(parentOSPath, "vfs", relativeDirOSPath); err != nil { - err = fmt.Errorf("failed to create data cache directory: %w", err) - } else if metaOSPath, err = createRootDir(parentOSPath, "vfsMeta", relativeDirOSPath); err != nil { - err = fmt.Errorf("failed to create metadata cache directory: %w", err) - } - return -} - // createItemDir creates the directory for named item in all cache roots // // Returns an os path for the data cache file. @@ -186,22 +156,6 @@ func (c *Cache) createItemDir(name string) (string, error) { return filepath.Join(parentPath, leaf), nil } -// getBackend gets a backend for a cache root dir -func getBackend(ctx context.Context, parentPath string, name string, relativeDirPath string) (fs.Fs, error) { - path := fmt.Sprintf("%s/%s/%s", parentPath, name, relativeDirPath) - return fscache.Get(ctx, path) -} - -// getBackends gets backends for all cache root dirs -func getBackends(ctx context.Context, parentPath string, relativeDirPath string) (fdata fs.Fs, fmeta fs.Fs, err error) { - if fdata, err = getBackend(ctx, parentPath, "vfs", relativeDirPath); err != nil { - err = fmt.Errorf("failed to get data cache backend: %w", err) - } else if fmeta, err = getBackend(ctx, parentPath, "vfsMeta", relativeDirPath); err != nil { - err = fmt.Errorf("failed to get metadata cache backend: %w", err) - } - return -} - // clean returns the cleaned version of name for use in the index map // // name should be a remote path not an osPath @@ -214,11 +168,6 @@ func clean(name string) string { return name } -// fromOSPath turns a OS path into a standard/remote path -func fromOSPath(osPath string) string { - return encoder.OS.ToStandardPath(filepath.ToSlash(osPath)) -} - // toOSPath turns a standard/remote path into an OS path func toOSPath(standardPath string) string { return filepath.FromSlash(encoder.OS.FromStandardPath(standardPath))