forked from TrueCloudLab/rclone
5fb6f94579
This is done by pacing the requests to drive and backing them off using an exponential increase. Put and Modify operations can now be retried also.
535 lines
12 KiB
Go
535 lines
12 KiB
Go
// Generic operations on filesystems and objects
|
|
|
|
package fs
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
)
|
|
|
|
// Work out modify window for fses passed in - sets Config.ModifyWindow
|
|
//
|
|
// This is the largest modify window of all the fses in use, and the
|
|
// user configured value
|
|
func CalculateModifyWindow(fs ...Fs) {
|
|
for _, f := range fs {
|
|
if f != nil {
|
|
precision := f.Precision()
|
|
if precision > Config.ModifyWindow {
|
|
Config.ModifyWindow = precision
|
|
}
|
|
}
|
|
}
|
|
Debug(fs[0], "Modify window is %s\n", Config.ModifyWindow)
|
|
}
|
|
|
|
// Check the two files to see if the MD5sums are the same
|
|
//
|
|
// May return an error which will already have been logged
|
|
//
|
|
// If an error is returned it will return false
|
|
func CheckMd5sums(src, dst Object) (bool, error) {
|
|
srcMd5, err := src.Md5sum()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to calculate src md5: %s", err)
|
|
return false, err
|
|
}
|
|
dstMd5, err := dst.Md5sum()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(dst, "Failed to calculate dst md5: %s", err)
|
|
return false, err
|
|
}
|
|
// Debug("Src MD5 %s", srcMd5)
|
|
// Debug("Dst MD5 %s", obj.Hash)
|
|
return srcMd5 == dstMd5, nil
|
|
}
|
|
|
|
// Checks to see if the src and dst objects are equal by looking at
|
|
// size, mtime and MD5SUM
|
|
//
|
|
// If the src and dst size are different then it is considered to be
|
|
// not equal.
|
|
//
|
|
// If the size is the same and the mtime is the same then it is
|
|
// considered to be equal. This is the heuristic rsync uses when
|
|
// not using --checksum.
|
|
//
|
|
// If the size is the same and and mtime is different or unreadable
|
|
// and the MD5SUM is the same then the file is considered to be equal.
|
|
// In this case the mtime on the dst is updated.
|
|
//
|
|
// Otherwise the file is considered to be not equal including if there
|
|
// were errors reading info.
|
|
func Equal(src, dst Object) bool {
|
|
if src.Size() != dst.Size() {
|
|
Debug(src, "Sizes differ")
|
|
return false
|
|
}
|
|
|
|
// Size the same so check the mtime
|
|
srcModTime := src.ModTime()
|
|
dstModTime := dst.ModTime()
|
|
dt := dstModTime.Sub(srcModTime)
|
|
ModifyWindow := Config.ModifyWindow
|
|
if dt >= ModifyWindow || dt <= -ModifyWindow {
|
|
Debug(src, "Modification times differ by %s: %v, %v", dt, srcModTime, dstModTime)
|
|
} else {
|
|
Debug(src, "Size and modification time the same (differ by %s, within tolerance %s)", dt, ModifyWindow)
|
|
return true
|
|
}
|
|
|
|
// mtime is unreadable or different but size is the same so
|
|
// check the MD5SUM
|
|
same, _ := CheckMd5sums(src, dst)
|
|
if !same {
|
|
Debug(src, "Md5sums differ")
|
|
return false
|
|
}
|
|
|
|
// Size and MD5 the same but mtime different so update the
|
|
// mtime of the dst object here
|
|
dst.SetModTime(srcModTime)
|
|
|
|
Debug(src, "Size and MD5SUM of src and dst objects identical")
|
|
return true
|
|
}
|
|
|
|
// Used to remove a failed copy
|
|
func removeFailedCopy(dst Object) {
|
|
if dst != nil {
|
|
Debug(dst, "Removing failed copy")
|
|
removeErr := dst.Remove()
|
|
if removeErr != nil {
|
|
Debug(dst, "Failed to remove failed copy: %s", removeErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Copy src object to dst or f if nil
|
|
//
|
|
// If dst is nil then the object must not exist already. If you do
|
|
// call Copy() with dst nil on a pre-existing file then some filing
|
|
// systems (eg Drive) may duplicate the file.
|
|
func Copy(f Fs, dst, src Object) {
|
|
const maxTries = 10
|
|
tries := 0
|
|
doUpdate := dst != nil
|
|
tryAgain:
|
|
in0, err := src.Open()
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to open: %s", err)
|
|
return
|
|
}
|
|
in := NewAccount(in0) // account the transfer
|
|
|
|
var actionTaken string
|
|
if doUpdate {
|
|
actionTaken = "Copied (updated existing)"
|
|
err = dst.Update(in, src.ModTime(), src.Size())
|
|
} else {
|
|
actionTaken = "Copied (new)"
|
|
dst, err = f.Put(in, src.Remote(), src.ModTime(), src.Size())
|
|
}
|
|
inErr := in.Close()
|
|
// Retry if err returned a retry error
|
|
if r, ok := err.(Retry); ok && r.Retry() && tries < maxTries {
|
|
tries++
|
|
Log(src, "Received error: %v - retrying %d/%d", err, tries, maxTries)
|
|
removeFailedCopy(dst)
|
|
goto tryAgain
|
|
}
|
|
if err == nil {
|
|
err = inErr
|
|
}
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to copy: %s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
|
|
// Verify sizes are the same after transfer
|
|
if src.Size() != dst.Size() {
|
|
Stats.Error()
|
|
err = fmt.Errorf("Corrupted on transfer: sizes differ %d vs %d", src.Size(), dst.Size())
|
|
Log(dst, "%s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
|
|
// Verify md5sums are the same after transfer - ignoring blank md5sums
|
|
srcMd5sum, md5sumErr := src.Md5sum()
|
|
if md5sumErr != nil {
|
|
Stats.Error()
|
|
Log(src, "Failed to read md5sum: %s", md5sumErr)
|
|
} else if srcMd5sum != "" {
|
|
dstMd5sum, md5sumErr := dst.Md5sum()
|
|
if md5sumErr != nil {
|
|
Stats.Error()
|
|
Log(dst, "Failed to read md5sum: %s", md5sumErr)
|
|
} else if dstMd5sum != "" && srcMd5sum != dstMd5sum {
|
|
Stats.Error()
|
|
err = fmt.Errorf("Corrupted on transfer: md5sums differ %q vs %q", srcMd5sum, dstMd5sum)
|
|
Log(dst, "%s", err)
|
|
removeFailedCopy(dst)
|
|
return
|
|
}
|
|
}
|
|
|
|
Debug(src, actionTaken)
|
|
}
|
|
|
|
// Check to see if src needs to be copied to dst and if so puts it in out
|
|
func checkOne(pair ObjectPair, out ObjectPairChan) {
|
|
src, dst := pair.src, pair.dst
|
|
if dst == nil {
|
|
Debug(src, "Couldn't find file - need to transfer")
|
|
out <- pair
|
|
return
|
|
}
|
|
// Check to see if can store this
|
|
if !src.Storable() {
|
|
return
|
|
}
|
|
// Check to see if changed or not
|
|
if Equal(src, dst) {
|
|
Debug(src, "Unchanged skipping")
|
|
return
|
|
}
|
|
out <- pair
|
|
}
|
|
|
|
// Read Objects~s on in send to out if they need uploading
|
|
//
|
|
// FIXME potentially doing lots of MD5SUMS at once
|
|
func PairChecker(in ObjectPairChan, out ObjectPairChan, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for pair := range in {
|
|
src := pair.src
|
|
Stats.Checking(src)
|
|
checkOne(pair, out)
|
|
Stats.DoneChecking(src)
|
|
}
|
|
}
|
|
|
|
// Read Objects on in and copy them
|
|
func Copier(in ObjectPairChan, fdst Fs, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
for pair := range in {
|
|
src := pair.src
|
|
Stats.Transferring(src)
|
|
if Config.DryRun {
|
|
Debug(src, "Not copying as --dry-run")
|
|
} else {
|
|
Copy(fdst, pair.dst, src)
|
|
}
|
|
Stats.DoneTransferring(src)
|
|
}
|
|
}
|
|
|
|
// Delete all the files passed in the channel
|
|
func DeleteFiles(to_be_deleted ObjectsChan) {
|
|
var wg sync.WaitGroup
|
|
wg.Add(Config.Transfers)
|
|
for i := 0; i < Config.Transfers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for dst := range to_be_deleted {
|
|
if Config.DryRun {
|
|
Debug(dst, "Not deleting as --dry-run")
|
|
} else {
|
|
Stats.Checking(dst)
|
|
err := dst.Remove()
|
|
Stats.DoneChecking(dst)
|
|
if err != nil {
|
|
Stats.Error()
|
|
Log(dst, "Couldn't delete: %s", err)
|
|
} else {
|
|
Debug(dst, "Deleted")
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
Log(nil, "Waiting for deletions to finish")
|
|
wg.Wait()
|
|
}
|
|
|
|
// Syncs fsrc into fdst
|
|
//
|
|
// If Delete is true then it deletes any files in fdst that aren't in fsrc
|
|
func Sync(fdst, fsrc Fs, Delete bool) error {
|
|
err := fdst.Mkdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
|
|
Log(fdst, "Building file list")
|
|
|
|
// Read the destination files first
|
|
// FIXME could do this in parallel and make it use less memory
|
|
delFiles := make(map[string]Object)
|
|
for dst := range fdst.List() {
|
|
delFiles[dst.Remote()] = dst
|
|
}
|
|
|
|
// Read source files checking them off against dest files
|
|
to_be_checked := make(ObjectPairChan, Config.Transfers)
|
|
to_be_uploaded := make(ObjectPairChan, Config.Transfers)
|
|
|
|
var checkerWg sync.WaitGroup
|
|
checkerWg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go PairChecker(to_be_checked, to_be_uploaded, &checkerWg)
|
|
}
|
|
|
|
var copierWg sync.WaitGroup
|
|
copierWg.Add(Config.Transfers)
|
|
for i := 0; i < Config.Transfers; i++ {
|
|
go Copier(to_be_uploaded, fdst, &copierWg)
|
|
}
|
|
|
|
go func() {
|
|
for src := range fsrc.List() {
|
|
remote := src.Remote()
|
|
dst, found := delFiles[remote]
|
|
if found {
|
|
delete(delFiles, remote)
|
|
to_be_checked <- ObjectPair{src, dst}
|
|
} else {
|
|
// No need to check since doesn't exist
|
|
to_be_uploaded <- ObjectPair{src, nil}
|
|
}
|
|
}
|
|
close(to_be_checked)
|
|
}()
|
|
|
|
Log(fdst, "Waiting for checks to finish")
|
|
checkerWg.Wait()
|
|
close(to_be_uploaded)
|
|
Log(fdst, "Waiting for transfers to finish")
|
|
copierWg.Wait()
|
|
|
|
// Delete files if asked
|
|
if Delete {
|
|
if Stats.Errored() {
|
|
Log(fdst, "Not deleting files as there were IO errors")
|
|
return nil
|
|
}
|
|
|
|
// Delete the spare files
|
|
toDelete := make(ObjectsChan, Config.Transfers)
|
|
go func() {
|
|
for _, fs := range delFiles {
|
|
toDelete <- fs
|
|
}
|
|
close(toDelete)
|
|
}()
|
|
DeleteFiles(toDelete)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Checks the files in fsrc and fdst according to Size and MD5SUM
|
|
func Check(fdst, fsrc Fs) error {
|
|
Log(fdst, "Building file list")
|
|
|
|
// Read the destination files first
|
|
// FIXME could do this in parallel and make it use less memory
|
|
dstFiles := make(map[string]Object)
|
|
for dst := range fdst.List() {
|
|
dstFiles[dst.Remote()] = dst
|
|
}
|
|
|
|
// Read the source files checking them against dstFiles
|
|
// FIXME could do this in parallel and make it use less memory
|
|
srcFiles := make(map[string]Object)
|
|
commonFiles := make(map[string][]Object)
|
|
for src := range fsrc.List() {
|
|
remote := src.Remote()
|
|
if dst, ok := dstFiles[remote]; ok {
|
|
commonFiles[remote] = []Object{dst, src}
|
|
delete(dstFiles, remote)
|
|
} else {
|
|
srcFiles[remote] = src
|
|
}
|
|
}
|
|
|
|
Log(fdst, "%d files not in %v", len(dstFiles), fsrc)
|
|
for _, dst := range dstFiles {
|
|
Stats.Error()
|
|
Log(dst, "File not in %v", fsrc)
|
|
}
|
|
|
|
Log(fsrc, "%d files not in %s", len(srcFiles), fdst)
|
|
for _, src := range srcFiles {
|
|
Stats.Error()
|
|
Log(src, "File not in %v", fdst)
|
|
}
|
|
|
|
checks := make(chan []Object, Config.Transfers)
|
|
go func() {
|
|
for _, check := range commonFiles {
|
|
checks <- check
|
|
}
|
|
close(checks)
|
|
}()
|
|
|
|
var checkerWg sync.WaitGroup
|
|
checkerWg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go func() {
|
|
defer checkerWg.Done()
|
|
for check := range checks {
|
|
dst, src := check[0], check[1]
|
|
Stats.Checking(src)
|
|
if src.Size() != dst.Size() {
|
|
Stats.DoneChecking(src)
|
|
Stats.Error()
|
|
Log(src, "Sizes differ")
|
|
continue
|
|
}
|
|
same, err := CheckMd5sums(src, dst)
|
|
Stats.DoneChecking(src)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !same {
|
|
Stats.Error()
|
|
Log(src, "Md5sums differ")
|
|
}
|
|
Debug(src, "OK")
|
|
}
|
|
}()
|
|
}
|
|
|
|
Log(fdst, "Waiting for checks to finish")
|
|
checkerWg.Wait()
|
|
Log(fdst, "%d differences found", Stats.GetErrors())
|
|
if Stats.GetErrors() > 0 {
|
|
return fmt.Errorf("%d differences found", Stats.GetErrors())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List the Fs to the supplied function
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListFn(f Fs, fn func(Object)) error {
|
|
in := f.List()
|
|
var wg sync.WaitGroup
|
|
wg.Add(Config.Checkers)
|
|
for i := 0; i < Config.Checkers; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
for o := range in {
|
|
fn(o)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Shows size and path
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func List(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
fmt.Fprintf(w, "%9d %s\n", o.Size(), o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Shows size, mod time and path
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func ListLong(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
Stats.Checking(o)
|
|
modTime := o.ModTime()
|
|
Stats.DoneChecking(o)
|
|
fmt.Fprintf(w, "%9d %s %s\n", o.Size(), modTime.Format("2006-01-02 15:04:05.000000000"), o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the Fs to stdout
|
|
//
|
|
// Produces the same output as the md5sum command
|
|
//
|
|
// Lists in parallel which may get them out of order
|
|
func Md5sum(f Fs, w io.Writer) error {
|
|
return ListFn(f, func(o Object) {
|
|
Stats.Checking(o)
|
|
md5sum, err := o.Md5sum()
|
|
Stats.DoneChecking(o)
|
|
if err != nil {
|
|
Debug(o, "Failed to read MD5: %v", err)
|
|
md5sum = "UNKNOWN"
|
|
}
|
|
fmt.Fprintf(w, "%32s %s\n", md5sum, o.Remote())
|
|
})
|
|
}
|
|
|
|
// List the directories/buckets/containers in the Fs to stdout
|
|
func ListDir(f Fs, w io.Writer) error {
|
|
for dir := range f.ListDir() {
|
|
fmt.Fprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Makes a destination directory or container
|
|
func Mkdir(f Fs) error {
|
|
err := f.Mkdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Removes a container but not if not empty
|
|
func Rmdir(f Fs) error {
|
|
if Config.DryRun {
|
|
Log(f, "Not deleting as dry run is set")
|
|
} else {
|
|
err := f.Rmdir()
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Removes a container and all of its contents
|
|
//
|
|
// FIXME doesn't delete local directories
|
|
func Purge(f Fs) error {
|
|
var err error
|
|
if purger, ok := f.(Purger); ok {
|
|
if Config.DryRun {
|
|
Debug(f, "Not purging as --dry-run set")
|
|
} else {
|
|
err = purger.Purge()
|
|
}
|
|
} else {
|
|
// DeleteFiles and Rmdir observe --dry-run
|
|
DeleteFiles(f.List())
|
|
err = Rmdir(f)
|
|
}
|
|
if err != nil {
|
|
Stats.Error()
|
|
return err
|
|
}
|
|
return nil
|
|
}
|