fs: Add --track-renames-strategy for configurable matching criteria for --track-renames
This commit adds the `--track-renames-strategy` flag which allows the user to choose the strategy for tracking renames when using the `--track-renames` flag. This can be "hash" or "modtime" or both currently. This, when used with `--track-renames-strategy modtime` enables support for tracking renames in encrypted remotes. Fixes #3696 Fixes #2721
This commit is contained in:
parent
36717c7d98
commit
158870bcdb
6 changed files with 217 additions and 67 deletions
|
@ -1153,6 +1153,14 @@ Note also that `--track-renames` is incompatible with
|
|||
`--delete-before` and will select `--delete-after` instead of
|
||||
`--delete-during`.
|
||||
|
||||
### --track-renames-strategy (hash,modtime) ###
|
||||
|
||||
This option changes the matching criteria for `--track-renames` to match
|
||||
by any combination of modtime, hash, size. Matchig by size is always enabled
|
||||
no matter what option is selected here. This also means
|
||||
that it enables `--track-renames` support for encrypted destinations.
|
||||
If nothing is specified, the default option is matching by hashes.
|
||||
|
||||
### --delete-(before,during,after) ###
|
||||
|
||||
This option allows you to specify when files on your destination are
|
||||
|
|
|
@ -128,6 +128,7 @@ These flags are available for every command.
|
|||
--tpslimit float Limit HTTP transactions per second to this.
|
||||
--tpslimit-burst int Max burst of transactions for --tpslimit. (default 1)
|
||||
--track-renames When synchronizing, track file renames and do a server side move if possible
|
||||
--track-renames-strategy When tracking renames, use multiple strategies of hash, modtime
|
||||
--transfers int Number of file transfers to run in parallel. (default 4)
|
||||
-u, --update Skip files that are newer on the destination.
|
||||
--use-cookies Enable session cookiejar.
|
||||
|
|
|
@ -59,7 +59,8 @@ type ConfigInfo struct {
|
|||
InsecureSkipVerify bool // Skip server certificate verification
|
||||
DeleteMode DeleteMode
|
||||
MaxDelete int64
|
||||
TrackRenames bool // Track file renames.
|
||||
TrackRenames bool // Track file renames.
|
||||
TrackRenamesStrategy string // Comma separated list of stratgies used to track renames
|
||||
LowLevelRetries int
|
||||
UpdateOlder bool // Skip files that are newer on the destination
|
||||
NoGzip bool // Disable compression
|
||||
|
@ -145,6 +146,8 @@ func NewConfig() *ConfigInfo {
|
|||
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
|
||||
c.MultiThreadStreams = 4
|
||||
|
||||
c.TrackRenamesStrategy = "hash"
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
|
|
|
@ -61,6 +61,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
|
|||
flags.BoolVarP(flagSet, &deleteAfter, "delete-after", "", false, "When synchronizing, delete files on destination after transferring (default)")
|
||||
flags.Int64VarP(flagSet, &fs.Config.MaxDelete, "max-delete", "", -1, "When synchronizing, limit the number of deletes")
|
||||
flags.BoolVarP(flagSet, &fs.Config.TrackRenames, "track-renames", "", fs.Config.TrackRenames, "When synchronizing, track file renames and do a server side move if possible")
|
||||
flags.StringVarP(flagSet, &fs.Config.TrackRenamesStrategy, "track-renames-strategy", "", fs.Config.TrackRenamesStrategy, "Strategies to use when synchronizing using track-renames hash|modtime")
|
||||
flags.IntVarP(flagSet, &fs.Config.LowLevelRetries, "low-level-retries", "", fs.Config.LowLevelRetries, "Number of low level retries to do.")
|
||||
flags.BoolVarP(flagSet, &fs.Config.UpdateOlder, "update", "u", fs.Config.UpdateOlder, "Skip files that are newer on the destination.")
|
||||
flags.BoolVarP(flagSet, &fs.Config.UseServerModTime, "use-server-modtime", "", fs.Config.UseServerModTime, "Use server modified time instead of object metadata")
|
||||
|
|
177
fs/sync/sync.go
177
fs/sync/sync.go
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -29,41 +30,42 @@ type syncCopyMove struct {
|
|||
deleteEmptySrcDirs bool
|
||||
dir string
|
||||
// internal state
|
||||
ctx context.Context // internal context for controlling go-routines
|
||||
cancel func() // cancel the context
|
||||
noTraverse bool // if set don't traverse the dst
|
||||
noCheckDest bool // if set transfer all objects regardless without checking dst
|
||||
deletersWg sync.WaitGroup // for delete before go routine
|
||||
deleteFilesCh chan fs.Object // channel to receive deletes if delete before
|
||||
trackRenames bool // set if we should do server side renames
|
||||
dstFilesMu sync.Mutex // protect dstFiles
|
||||
dstFiles map[string]fs.Object // dst files, always filled
|
||||
srcFiles map[string]fs.Object // src files, only used if deleteBefore
|
||||
srcFilesChan chan fs.Object // passes src objects
|
||||
srcFilesResult chan error // error result of src listing
|
||||
dstFilesResult chan error // error result of dst listing
|
||||
dstEmptyDirsMu sync.Mutex // protect dstEmptyDirs
|
||||
dstEmptyDirs map[string]fs.DirEntry // potentially empty directories
|
||||
srcEmptyDirsMu sync.Mutex // protect srcEmptyDirs
|
||||
srcEmptyDirs map[string]fs.DirEntry // potentially empty directories
|
||||
checkerWg sync.WaitGroup // wait for checkers
|
||||
toBeChecked *pipe // checkers channel
|
||||
transfersWg sync.WaitGroup // wait for transfers
|
||||
toBeUploaded *pipe // copiers channel
|
||||
errorMu sync.Mutex // Mutex covering the errors variables
|
||||
err error // normal error from copy process
|
||||
noRetryErr error // error with NoRetry set
|
||||
fatalErr error // fatal error
|
||||
commonHash hash.Type // common hash type between src and dst
|
||||
renameMapMu sync.Mutex // mutex to protect the below
|
||||
renameMap map[string][]fs.Object // dst files by hash - only used by trackRenames
|
||||
renamerWg sync.WaitGroup // wait for renamers
|
||||
toBeRenamed *pipe // renamers channel
|
||||
trackRenamesWg sync.WaitGroup // wg for background track renames
|
||||
trackRenamesCh chan fs.Object // objects are pumped in here
|
||||
renameCheck []fs.Object // accumulate files to check for rename here
|
||||
compareCopyDest fs.Fs // place to check for files to server side copy
|
||||
backupDir fs.Fs // place to store overwrites/deletes
|
||||
ctx context.Context // internal context for controlling go-routines
|
||||
cancel func() // cancel the context
|
||||
noTraverse bool // if set don't traverse the dst
|
||||
noCheckDest bool // if set transfer all objects regardless without checking dst
|
||||
deletersWg sync.WaitGroup // for delete before go routine
|
||||
deleteFilesCh chan fs.Object // channel to receive deletes if delete before
|
||||
trackRenames bool // set if we should do server side renames
|
||||
trackRenamesStrategy []string // stratgies used for tracking renames
|
||||
dstFilesMu sync.Mutex // protect dstFiles
|
||||
dstFiles map[string]fs.Object // dst files, always filled
|
||||
srcFiles map[string]fs.Object // src files, only used if deleteBefore
|
||||
srcFilesChan chan fs.Object // passes src objects
|
||||
srcFilesResult chan error // error result of src listing
|
||||
dstFilesResult chan error // error result of dst listing
|
||||
dstEmptyDirsMu sync.Mutex // protect dstEmptyDirs
|
||||
dstEmptyDirs map[string]fs.DirEntry // potentially empty directories
|
||||
srcEmptyDirsMu sync.Mutex // protect srcEmptyDirs
|
||||
srcEmptyDirs map[string]fs.DirEntry // potentially empty directories
|
||||
checkerWg sync.WaitGroup // wait for checkers
|
||||
toBeChecked *pipe // checkers channel
|
||||
transfersWg sync.WaitGroup // wait for transfers
|
||||
toBeUploaded *pipe // copiers channel
|
||||
errorMu sync.Mutex // Mutex covering the errors variables
|
||||
err error // normal error from copy process
|
||||
noRetryErr error // error with NoRetry set
|
||||
fatalErr error // fatal error
|
||||
commonHash hash.Type // common hash type between src and dst
|
||||
renameMapMu sync.Mutex // mutex to protect the below
|
||||
renameMap map[string][]fs.Object // dst files by hash - only used by trackRenames
|
||||
renamerWg sync.WaitGroup // wait for renamers
|
||||
toBeRenamed *pipe // renamers channel
|
||||
trackRenamesWg sync.WaitGroup // wg for background track renames
|
||||
trackRenamesCh chan fs.Object // objects are pumped in here
|
||||
renameCheck []fs.Object // accumulate files to check for rename here
|
||||
compareCopyDest fs.Fs // place to check for files to server side copy
|
||||
backupDir fs.Fs // place to store overwrites/deletes
|
||||
}
|
||||
|
||||
func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, deleteEmptySrcDirs bool, copyEmptySrcDirs bool) (*syncCopyMove, error) {
|
||||
|
@ -71,24 +73,25 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
|||
return nil, fserrors.FatalError(fs.ErrorOverlapping)
|
||||
}
|
||||
s := &syncCopyMove{
|
||||
fdst: fdst,
|
||||
fsrc: fsrc,
|
||||
deleteMode: deleteMode,
|
||||
DoMove: DoMove,
|
||||
copyEmptySrcDirs: copyEmptySrcDirs,
|
||||
deleteEmptySrcDirs: deleteEmptySrcDirs,
|
||||
dir: "",
|
||||
srcFilesChan: make(chan fs.Object, fs.Config.Checkers+fs.Config.Transfers),
|
||||
srcFilesResult: make(chan error, 1),
|
||||
dstFilesResult: make(chan error, 1),
|
||||
dstEmptyDirs: make(map[string]fs.DirEntry),
|
||||
srcEmptyDirs: make(map[string]fs.DirEntry),
|
||||
noTraverse: fs.Config.NoTraverse,
|
||||
noCheckDest: fs.Config.NoCheckDest,
|
||||
deleteFilesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
trackRenames: fs.Config.TrackRenames,
|
||||
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
|
||||
trackRenamesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
fdst: fdst,
|
||||
fsrc: fsrc,
|
||||
deleteMode: deleteMode,
|
||||
DoMove: DoMove,
|
||||
copyEmptySrcDirs: copyEmptySrcDirs,
|
||||
deleteEmptySrcDirs: deleteEmptySrcDirs,
|
||||
dir: "",
|
||||
srcFilesChan: make(chan fs.Object, fs.Config.Checkers+fs.Config.Transfers),
|
||||
srcFilesResult: make(chan error, 1),
|
||||
dstFilesResult: make(chan error, 1),
|
||||
dstEmptyDirs: make(map[string]fs.DirEntry),
|
||||
srcEmptyDirs: make(map[string]fs.DirEntry),
|
||||
noTraverse: fs.Config.NoTraverse,
|
||||
noCheckDest: fs.Config.NoCheckDest,
|
||||
deleteFilesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
trackRenames: fs.Config.TrackRenames,
|
||||
trackRenamesStrategy: strings.Split(fs.Config.TrackRenamesStrategy, ","),
|
||||
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
|
||||
trackRenamesCh: make(chan fs.Object, fs.Config.Checkers),
|
||||
}
|
||||
var err error
|
||||
s.toBeChecked, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog)
|
||||
|
@ -132,10 +135,16 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete
|
|||
fs.Errorf(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy")
|
||||
s.trackRenames = false
|
||||
}
|
||||
if s.commonHash == hash.None {
|
||||
if s.commonHash == hash.None && isUsingRenameStrategy("hash", s.trackRenamesStrategy) {
|
||||
fs.Errorf(fdst, "Ignoring --track-renames as the source and destination do not have a common hash")
|
||||
s.trackRenames = false
|
||||
}
|
||||
|
||||
if fs.GetModifyWindow(fsrc, fdst) == fs.ModTimeNotSupported && isUsingRenameStrategy("modtime", s.trackRenamesStrategy) {
|
||||
fs.Errorf(fdst, "Ignoring --track-renames as either the source or destination do not support modtime")
|
||||
s.trackRenames = false
|
||||
}
|
||||
|
||||
if s.deleteMode == fs.DeleteModeOff {
|
||||
fs.Errorf(fdst, "Ignoring --track-renames as it doesn't work with copy or move, only sync")
|
||||
s.trackRenames = false
|
||||
|
@ -560,20 +569,53 @@ func (s *syncCopyMove) srcParentDirCheck(entry fs.DirEntry) {
|
|||
}
|
||||
}
|
||||
|
||||
// renameHash makes a string with the size and the hash for rename detection
|
||||
// isUsingRenameStrategy checks if a strategy is included in a list of strategies
|
||||
func isUsingRenameStrategy(strategy string, strategies []string) bool {
|
||||
for _, s := range strategies {
|
||||
if s == strategy {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// renameID makes a string with the size and the other identifiers of the requested rename strategies
|
||||
//
|
||||
// it may return an empty string in which case no hash could be made
|
||||
func (s *syncCopyMove) renameHash(obj fs.Object) (hash string) {
|
||||
var err error
|
||||
hash, err = obj.Hash(s.ctx, s.commonHash)
|
||||
if err != nil {
|
||||
fs.Debugf(obj, "Hash failed: %v", err)
|
||||
return ""
|
||||
func (s *syncCopyMove) renameID(obj fs.Object, renameStrategy []string, precision time.Duration) string {
|
||||
var builder strings.Builder
|
||||
|
||||
fmt.Fprintf(&builder, "%d", obj.Size())
|
||||
|
||||
if isUsingRenameStrategy("hash", renameStrategy) {
|
||||
var err error
|
||||
hash, err := obj.Hash(s.ctx, s.commonHash)
|
||||
|
||||
if err != nil {
|
||||
fs.Debugf(obj, "Hash failed: %v", err)
|
||||
return ""
|
||||
}
|
||||
if hash == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
fmt.Fprintf(&builder, ",%s", hash)
|
||||
}
|
||||
if hash == "" {
|
||||
return ""
|
||||
|
||||
if isUsingRenameStrategy("modtime", renameStrategy) {
|
||||
modTime := obj.ModTime(s.ctx)
|
||||
divisor := precision.Nanoseconds() / 2
|
||||
rounding := divisor / 2
|
||||
|
||||
if rounding > 0 {
|
||||
rounding--
|
||||
}
|
||||
|
||||
timeHash := (modTime.Unix()*time.Nanosecond.Nanoseconds() + int64(modTime.Nanosecond()) + rounding)
|
||||
fmt.Fprintf(&builder, ",%d", timeHash)
|
||||
}
|
||||
return fmt.Sprintf("%d,%s", obj.Size(), hash)
|
||||
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
// pushRenameMap adds the object with hash to the rename map
|
||||
|
@ -626,10 +668,12 @@ func (s *syncCopyMove) makeRenameMap() {
|
|||
// only create hash for dst fs.Object if its size could match
|
||||
if _, found := possibleSizes[obj.Size()]; found {
|
||||
tr := accounting.Stats(s.ctx).NewCheckingTransfer(obj)
|
||||
hash := s.renameHash(obj)
|
||||
hash := s.renameID(obj, s.trackRenamesStrategy, fs.GetModifyWindow(s.fsrc, s.fdst))
|
||||
|
||||
if hash != "" {
|
||||
s.pushRenameMap(hash, obj)
|
||||
}
|
||||
|
||||
tr.Done(nil)
|
||||
}
|
||||
}
|
||||
|
@ -643,7 +687,8 @@ func (s *syncCopyMove) makeRenameMap() {
|
|||
// possible, it returns true if the object was renamed.
|
||||
func (s *syncCopyMove) tryRename(src fs.Object) bool {
|
||||
// Calculate the hash of the src object
|
||||
hash := s.renameHash(src)
|
||||
hash := s.renameID(src, s.trackRenamesStrategy, fs.GetModifyWindow(s.fsrc, s.fdst))
|
||||
|
||||
if hash == "" {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -1127,6 +1127,98 @@ func TestSyncWithTrackRenames(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSyncWithTrackRenamesStrategyModtime(t *testing.T) {
|
||||
r := fstest.NewRun(t)
|
||||
defer r.Finalise()
|
||||
|
||||
fs.Config.TrackRenames = true
|
||||
fs.Config.TrackRenamesStrategy = "hash,modtime"
|
||||
defer func() {
|
||||
fs.Config.TrackRenames = false
|
||||
fs.Config.TrackRenamesStrategy = "hash"
|
||||
}()
|
||||
|
||||
haveHash := r.Fremote.Hashes().Overlap(r.Flocal.Hashes()).GetOne() != hash.None
|
||||
canTrackRenames := haveHash && operations.CanServerSideMove(r.Fremote)
|
||||
t.Logf("Can track renames: %v", canTrackRenames)
|
||||
|
||||
f1 := r.WriteFile("potato", "Potato Content", t1)
|
||||
f2 := r.WriteFile("yam", "Yam Content", t2)
|
||||
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false))
|
||||
|
||||
fstest.CheckItems(t, r.Fremote, f1, f2)
|
||||
fstest.CheckItems(t, r.Flocal, f1, f2)
|
||||
|
||||
// Now rename locally.
|
||||
f2 = r.RenameFile(f2, "yaml")
|
||||
|
||||
accounting.GlobalStats().ResetCounters()
|
||||
require.NoError(t, Sync(context.Background(), r.Fremote, r.Flocal, false))
|
||||
|
||||
fstest.CheckItems(t, r.Fremote, f1, f2)
|
||||
|
||||
// As currently there is no Fs interface providing number of chunks
|
||||
// in a file, this test depends on the well-known names of test remotes.
|
||||
remote := r.Fremote.Name()
|
||||
|
||||
// Union remote can Move but returns CantMove error.
|
||||
moveAsCopyDelete := r.Fremote.Features().Move == nil || remote == "TestUnion"
|
||||
|
||||
chunker := strings.HasPrefix(remote, "TestChunker")
|
||||
wrappedMoveAsCopyDelete := chunker && strings.HasSuffix(remote, "S3")
|
||||
|
||||
chunk3b := chunker && strings.Contains(remote, "Chunk3b") // chunker with 3 byte chunks
|
||||
chunk50b := chunker && strings.Contains(remote, "Chunk50b") // chunker with 50 byte chunks
|
||||
chunkDefault := chunker && !strings.Contains(remote, "ChunkerChunk") // default big chunk size
|
||||
chunkBig := chunk50b || chunkDefault // file is smaller than chunk size
|
||||
|
||||
// Verify number of checks for a toy 14 byte file.
|
||||
// The order of cases matters!
|
||||
var checks int
|
||||
switch {
|
||||
case canTrackRenames && chunk3b:
|
||||
checks = 8 // chunker makes extra checks for each small chunk
|
||||
case canTrackRenames && chunkBig:
|
||||
checks = 4 // chunker makes 1 extra check for a single big chunk
|
||||
case canTrackRenames && moveAsCopyDelete:
|
||||
checks = 4 // 2 file checks + 1 move + 1 delete
|
||||
case canTrackRenames:
|
||||
checks = 3 // 2 file checks + 1 move
|
||||
case !chunker:
|
||||
checks = 2 // 2 file checks on a generic non-chunking remote
|
||||
case chunk3b:
|
||||
checks = 6 // chunker makes extra checks for each small chunk
|
||||
case chunkBig && wrappedMoveAsCopyDelete:
|
||||
checks = 4 // one more extra check because S3 emulates Move as Copy+Delete
|
||||
case chunkBig:
|
||||
checks = 3 // chunker makes 1 extra check for a single big chunk
|
||||
default:
|
||||
checks = -1 // skip verification for chunker with unknown chunk size
|
||||
}
|
||||
if checks != -1 { // "-1" allows remotes to bypass this check
|
||||
assert.Equal(t, int64(checks), accounting.GlobalStats().GetChecks())
|
||||
}
|
||||
|
||||
// Verify number of copy operations for a toy 14 byte file.
|
||||
// The order of cases matters!
|
||||
var copies int64
|
||||
switch {
|
||||
case canTrackRenames && moveAsCopyDelete:
|
||||
copies = 1 // 1 copy
|
||||
case canTrackRenames:
|
||||
copies = 0 // 0 copy
|
||||
case chunkBig && wrappedMoveAsCopyDelete:
|
||||
copies = 2 // extra Copy because S3 emulates Move as Copy+Delete.
|
||||
default:
|
||||
copies = 1
|
||||
}
|
||||
if copies != -1 { // "-1" allows remotes to bypass this check
|
||||
assert.Equal(t, copies, accounting.GlobalStats().GetTransfers())
|
||||
}
|
||||
}
|
||||
|
||||
func toyFileTransfers(r *fstest.Run) int64 {
|
||||
remote := r.Fremote.Name()
|
||||
transfers := 1
|
||||
|
|
Loading…
Reference in a new issue