move: if --check-first and --order-by are set then delete with perfect ordering
If using rclone move and --check-first and --order-by then rclone uses the transfer routine to delete files to ensure perfect ordering. This will cause the transfer stats to have a larger than expected number of items in it so we don't enable this by default. Fixes #6033
This commit is contained in:
parent
4edcd16f5f
commit
dd6e229327
4 changed files with 38 additions and 4 deletions
|
@ -789,6 +789,12 @@ interfere with checking.
|
||||||
It can also be useful to ensure perfect ordering when using
|
It can also be useful to ensure perfect ordering when using
|
||||||
`--order-by`.
|
`--order-by`.
|
||||||
|
|
||||||
|
If both `--check-first` and `--order-by` are set when doing `rclone move`
|
||||||
|
then rclone will use the transfer thread to delete source files which
|
||||||
|
don't need transferring. This will enable perfect ordering of the
|
||||||
|
transfers and deletes but will cause the transfer stats to have more
|
||||||
|
items in than expected.
|
||||||
|
|
||||||
Using this flag can use more memory as it effectively sets
|
Using this flag can use more memory as it effectively sets
|
||||||
`--max-backlog` to infinite. This means that all the info on the
|
`--max-backlog` to infinite. This means that all the info on the
|
||||||
objects to transfer is held in memory before the transfers start.
|
objects to transfer is held in memory before the transfers start.
|
||||||
|
|
|
@ -85,6 +85,8 @@ func (p *pipe) Pop() interface{} {
|
||||||
// It returns ok = false if the context was cancelled
|
// It returns ok = false if the context was cancelled
|
||||||
//
|
//
|
||||||
// It will panic if you call it after Close()
|
// It will panic if you call it after Close()
|
||||||
|
//
|
||||||
|
// Note that pairs where src==dst aren't counted for stats
|
||||||
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return false
|
return false
|
||||||
|
@ -97,7 +99,7 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
||||||
deheap.Push(p, pair)
|
deheap.Push(p, pair)
|
||||||
}
|
}
|
||||||
size := pair.Src.Size()
|
size := pair.Src.Size()
|
||||||
if size > 0 {
|
if size > 0 && pair.Src != pair.Dst {
|
||||||
p.totalSize += size
|
p.totalSize += size
|
||||||
}
|
}
|
||||||
p.stats(len(p.queue), p.totalSize)
|
p.stats(len(p.queue), p.totalSize)
|
||||||
|
@ -141,7 +143,7 @@ func (p *pipe) GetMax(ctx context.Context, fraction int) (pair fs.ObjectPair, ok
|
||||||
pair = deheap.PopMax(p).(fs.ObjectPair)
|
pair = deheap.PopMax(p).(fs.ObjectPair)
|
||||||
}
|
}
|
||||||
size := pair.Src.Size()
|
size := pair.Src.Size()
|
||||||
if size > 0 {
|
if size > 0 && pair.Src != pair.Dst {
|
||||||
p.totalSize -= size
|
p.totalSize -= size
|
||||||
}
|
}
|
||||||
if p.totalSize < 0 {
|
if p.totalSize < 0 {
|
||||||
|
|
|
@ -42,12 +42,18 @@ func TestPipe(t *testing.T) {
|
||||||
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
|
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
|
||||||
|
|
||||||
pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
|
pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
|
||||||
|
pairD := fs.ObjectPair{Src: obj1, Dst: obj1} // this object should not count to the stats
|
||||||
|
|
||||||
// Put an object
|
// Put an object
|
||||||
ok := p.Put(ctx, pair1)
|
ok := p.Put(ctx, pair1)
|
||||||
assert.Equal(t, true, ok)
|
assert.Equal(t, true, ok)
|
||||||
checkStats(1, 5)
|
checkStats(1, 5)
|
||||||
|
|
||||||
|
// Put an object to be deleted
|
||||||
|
ok = p.Put(ctx, pairD)
|
||||||
|
assert.Equal(t, true, ok)
|
||||||
|
checkStats(2, 5)
|
||||||
|
|
||||||
// Close the pipe showing reading on closed pipe is OK
|
// Close the pipe showing reading on closed pipe is OK
|
||||||
p.Close()
|
p.Close()
|
||||||
|
|
||||||
|
@ -55,6 +61,12 @@ func TestPipe(t *testing.T) {
|
||||||
pair2, ok := p.Get(ctx)
|
pair2, ok := p.Get(ctx)
|
||||||
assert.Equal(t, pair1, pair2)
|
assert.Equal(t, pair1, pair2)
|
||||||
assert.Equal(t, true, ok)
|
assert.Equal(t, true, ok)
|
||||||
|
checkStats(1, 0)
|
||||||
|
|
||||||
|
// Read from pipe
|
||||||
|
pair2, ok = p.Get(ctx)
|
||||||
|
assert.Equal(t, pairD, pair2)
|
||||||
|
assert.Equal(t, true, ok)
|
||||||
checkStats(0, 0)
|
checkStats(0, 0)
|
||||||
|
|
||||||
// Check read on closed pipe
|
// Check read on closed pipe
|
||||||
|
|
|
@ -377,6 +377,14 @@ func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.W
|
||||||
fs.Logf(src, "Not removing source file as it is the same file as the destination")
|
fs.Logf(src, "Not removing source file as it is the same file as the destination")
|
||||||
} else if s.ci.IgnoreExisting {
|
} else if s.ci.IgnoreExisting {
|
||||||
fs.Debugf(src, "Not removing source file as destination file exists and --ignore-existing is set")
|
fs.Debugf(src, "Not removing source file as destination file exists and --ignore-existing is set")
|
||||||
|
} else if s.checkFirst && s.ci.OrderBy != "" {
|
||||||
|
// If we want perfect ordering then use the transfers to delete the file
|
||||||
|
//
|
||||||
|
// We send src == dst, to say we want the src deleted
|
||||||
|
ok = out.Put(s.ctx, fs.ObjectPair{Src: src, Dst: src})
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
s.processError(operations.DeleteFile(s.ctx, src))
|
s.processError(operations.DeleteFile(s.ctx, src))
|
||||||
}
|
}
|
||||||
|
@ -417,10 +425,16 @@ func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
src := pair.Src
|
src := pair.Src
|
||||||
|
dst := pair.Dst
|
||||||
if s.DoMove {
|
if s.DoMove {
|
||||||
_, err = operations.Move(ctx, fdst, pair.Dst, src.Remote(), src)
|
if src != dst {
|
||||||
|
_, err = operations.Move(ctx, fdst, dst, src.Remote(), src)
|
||||||
|
} else {
|
||||||
|
// src == dst signals delete the src
|
||||||
|
err = operations.DeleteFile(ctx, src)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
_, err = operations.Copy(ctx, fdst, pair.Dst, src.Remote(), src)
|
_, err = operations.Copy(ctx, fdst, dst, src.Remote(), src)
|
||||||
}
|
}
|
||||||
s.processError(err)
|
s.processError(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue