sync: add a buffer for checks, uploads and renames #379

--max-backlog controls the queue length.

Add statistics for the check/upload/rename queues.

This means that checking can complete before the uploads which will
give rclone the ability to show exactly what is outstanding.
This commit is contained in:
Nick Craig-Wood 2018-07-19 22:41:34 +01:00
parent eb84b58d3c
commit cb7a461287
8 changed files with 353 additions and 117 deletions

View file

@ -535,6 +535,22 @@ to reduce the value so rclone moves on to a high level retry (see the
Disable low level retries with `--low-level-retries 1`.
### --max-backlog=N ###
This is the maximum allowable backlog of files in a sync/copy/move
queued for being checked or transferred.
This can be set arbitrarily large. It will only use memory when the
queue is in use. Note that it will use in the order of N kB of memory
when the backlog is in use.
Setting this large allows rclone to calculate how many files are
pending more accurately and give a more accurate estimated finish
time.
Setting this small will make rclone more synchronous to the listings
of the remote which may be desirable.
### --max-delete=N ###
This tells rclone not to delete more than N files. If that limit is

View file

@ -65,17 +65,23 @@ The value for "eta" is null if an eta cannot be determined.
// StatsInfo accounts all transfers
type StatsInfo struct {
mu sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking *stringSet
transfers int64
transferring *stringSet
deletes int64
start time.Time
inProgress *inProgress
mu sync.RWMutex
bytes int64
errors int64
lastError error
checks int64
checking *stringSet
checkQueue int
checkQueueSize int64
transfers int64
transferring *stringSet
transferQueue int
transferQueueSize int64
renameQueue int
renameQueueSize int64
deletes int64
start time.Time
inProgress *inProgress
}
// NewStats cretates an initialised StatsInfo
@ -294,3 +300,27 @@ func (s *StatsInfo) DoneTransferring(remote string, ok bool) {
s.mu.Unlock()
}
}
// SetCheckQueue sets the number of queued checks
func (s *StatsInfo) SetCheckQueue(n int, size int64) {
s.mu.Lock()
s.checkQueue = n
s.checkQueueSize = size
s.mu.Unlock()
}
// SetTransferQueue sets the number of queued transfers
func (s *StatsInfo) SetTransferQueue(n int, size int64) {
s.mu.Lock()
s.transferQueue = n
s.transferQueueSize = size
s.mu.Unlock()
}
// SetRenameQueue sets the number of queued transfers
func (s *StatsInfo) SetRenameQueue(n int, size int64) {
s.mu.Lock()
s.renameQueue = n
s.renameQueueSize = size
s.mu.Unlock()
}

View file

@ -81,6 +81,7 @@ type ConfigInfo struct {
AskPassword bool
UseServerModTime bool
MaxTransfer SizeSuffix
MaxBacklog int
}
// NewConfig creates a new config with everything set to the default
@ -109,6 +110,7 @@ func NewConfig() *ConfigInfo {
c.AskPassword = true
c.TPSLimitBurst = 1
c.MaxTransfer = -1
c.MaxBacklog = 10000
return c
}

View file

@ -83,6 +83,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.")
flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList)
flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.")
flags.IntVarP(flagSet, &fs.Config.MaxBacklog, "max-backlog", "", fs.Config.MaxBacklog, "Maximum number of objects in sync or check backlog.")
}
// SetFlags converts any flags into config which weren't straight foward

View file

@ -827,9 +827,6 @@ type ObjectPair struct {
Src, Dst Object
}
// ObjectPairChan is a channel of ObjectPair
type ObjectPairChan chan ObjectPair
// Find looks for an Info object for the name passed in
//
// Services are looked up in the config file

100
fs/sync/pipe.go Normal file
View file

@ -0,0 +1,100 @@
package sync
import (
"context"
"sync"
"github.com/ncw/rclone/fs"
)
// pipe provides an unbounded channel like experience
//
// Note unlike channels these aren't strictly ordered.
type pipe struct {
mu sync.Mutex
c chan struct{}
queue []fs.ObjectPair
closed bool
totalSize int64
stats func(items int, totalSize int64)
}
func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe {
return &pipe{
c: make(chan struct{}, maxBacklog),
stats: stats,
}
}
// Put an pair into the pipe
//
// It returns ok = false if the context was cancelled
//
// It will panic if you call it after Close()
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
if ctx.Err() != nil {
return false
}
p.mu.Lock()
p.queue = append(p.queue, pair)
size := pair.Src.Size()
if size > 0 {
p.totalSize += size
}
p.stats(len(p.queue), p.totalSize)
p.mu.Unlock()
select {
case <-ctx.Done():
return false
case p.c <- struct{}{}:
}
return true
}
// Get a pair from the pipe
//
// It returns ok = false if the context was cancelled or Close() has
// been called.
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
if ctx.Err() != nil {
return
}
select {
case <-ctx.Done():
return
case _, ok = <-p.c:
if !ok {
return
}
}
p.mu.Lock()
pair, p.queue = p.queue[0], p.queue[1:]
size := pair.Src.Size()
if size > 0 {
p.totalSize -= size
}
if p.totalSize < 0 {
p.totalSize = 0
}
p.stats(len(p.queue), p.totalSize)
p.mu.Unlock()
return pair, true
}
// Stats reads the number of items in the queue and the totalSize
func (p *pipe) Stats() (items int, totalSize int64) {
p.mu.Lock()
items, totalSize = len(p.queue), p.totalSize
p.mu.Unlock()
return items, totalSize
}
// Close the pipe
//
// Writes to a closed pipe will panic as will double closing a pipe
func (p *pipe) Close() {
p.mu.Lock()
close(p.c)
p.closed = true
p.mu.Unlock()
}

122
fs/sync/pipe_test.go Normal file
View file

@ -0,0 +1,122 @@
package sync
import (
"context"
"sync"
"sync/atomic"
"testing"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest/mockobject"
"github.com/stretchr/testify/assert"
)
func TestPipe(t *testing.T) {
var queueLength int
var queueSize int64
stats := func(n int, size int64) {
queueLength, queueSize = n, size
}
// Make a new pipe
p := newPipe(stats, 10)
checkStats := func(expectedN int, expectedSize int64) {
n, size := p.Stats()
assert.Equal(t, expectedN, n)
assert.Equal(t, expectedSize, size)
assert.Equal(t, expectedN, queueLength)
assert.Equal(t, expectedSize, queueSize)
}
checkStats(0, 0)
ctx := context.Background()
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
// Put an object
ok := p.Put(ctx, pair1)
assert.Equal(t, true, ok)
checkStats(1, 5)
// Close the pipe showing reading on closed pipe is OK
p.Close()
// Read from pipe
pair2, ok := p.Get(ctx)
assert.Equal(t, pair1, pair2)
assert.Equal(t, true, ok)
checkStats(0, 0)
// Check read on closed pipe
pair2, ok = p.Get(ctx)
assert.Equal(t, fs.ObjectPair{}, pair2)
assert.Equal(t, false, ok)
// Check panic on write to closed pipe
assert.Panics(t, func() { p.Put(ctx, pair1) })
// Make a new pipe
p = newPipe(stats, 10)
ctx2, cancel := context.WithCancel(ctx)
// cancel it in the background - check read ceases
go cancel()
pair2, ok = p.Get(ctx2)
assert.Equal(t, fs.ObjectPair{}, pair2)
assert.Equal(t, false, ok)
// check we can't write
ok = p.Put(ctx2, pair1)
assert.Equal(t, false, ok)
}
// TestPipeConcurrent runs concurrent Get and Put to flush out any
// race conditions and concurrency problems.
func TestPipeConcurrent(t *testing.T) {
const (
N = 1000
readWriters = 10
)
stats := func(n int, size int64) {}
// Make a new pipe
p := newPipe(stats, 10)
var wg sync.WaitGroup
obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone)
pair1 := fs.ObjectPair{Src: obj1, Dst: nil}
ctx := context.Background()
var count int64
for j := 0; j < readWriters; j++ {
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
// Read from pipe
pair2, ok := p.Get(ctx)
assert.Equal(t, pair1, pair2)
assert.Equal(t, true, ok)
atomic.AddInt64(&count, -1)
}
}()
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
// Put an object
ok := p.Put(ctx, pair1)
assert.Equal(t, true, ok)
atomic.AddInt64(&count, 1)
}
}()
}
wg.Wait()
assert.Equal(t, int64(0), count)
}

View file

@ -43,9 +43,9 @@ type syncCopyMove struct {
srcEmptyDirsMu sync.Mutex // protect srcEmptyDirs
srcEmptyDirs map[string]fs.DirEntry // potentially empty directories
checkerWg sync.WaitGroup // wait for checkers
toBeChecked fs.ObjectPairChan // checkers channel
toBeChecked *pipe // checkers channel
transfersWg sync.WaitGroup // wait for transfers
toBeUploaded fs.ObjectPairChan // copiers channel
toBeUploaded *pipe // copiers channel
errorMu sync.Mutex // Mutex covering the errors variables
err error // normal error from copy process
noRetryErr error // error with NoRetry set
@ -54,7 +54,7 @@ type syncCopyMove struct {
renameMapMu sync.Mutex // mutex to protect the below
renameMap map[string][]fs.Object // dst files by hash - only used by trackRenames
renamerWg sync.WaitGroup // wait for renamers
toBeRenamed fs.ObjectPairChan // renamers channel
toBeRenamed *pipe // renamers channel
trackRenamesWg sync.WaitGroup // wg for background track renames
trackRenamesCh chan fs.Object // objects are pumped in here
renameCheck []fs.Object // accumulate files to check for rename here
@ -75,12 +75,12 @@ func newSyncCopyMove(fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, de
dstFilesResult: make(chan error, 1),
dstEmptyDirs: make(map[string]fs.DirEntry),
srcEmptyDirs: make(map[string]fs.DirEntry),
toBeChecked: make(fs.ObjectPairChan, fs.Config.Transfers),
toBeUploaded: make(fs.ObjectPairChan, fs.Config.Transfers),
toBeChecked: newPipe(accounting.Stats.SetCheckQueue, fs.Config.MaxBacklog),
toBeUploaded: newPipe(accounting.Stats.SetTransferQueue, fs.Config.MaxBacklog),
deleteFilesCh: make(chan fs.Object, fs.Config.Checkers),
trackRenames: fs.Config.TrackRenames,
commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(),
toBeRenamed: make(fs.ObjectPairChan, fs.Config.Transfers),
toBeRenamed: newPipe(accounting.Stats.SetRenameQueue, fs.Config.MaxBacklog),
trackRenamesCh: make(chan fs.Object, fs.Config.Checkers),
}
s.ctx, s.cancel = context.WithCancel(context.Background())
@ -131,12 +131,7 @@ func newSyncCopyMove(fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, de
// Check to see if the context has been cancelled
func (s *syncCopyMove) aborting() bool {
select {
case <-s.ctx.Done():
return true
default:
}
return false
return s.ctx.Err() != nil
}
// This reads the map and pumps it into the channel passed in, closing
@ -197,119 +192,95 @@ func (s *syncCopyMove) currentError() error {
// pairChecker reads Objects~s on in send to out if they need transferring.
//
// FIXME potentially doing lots of hashes at once
func (s *syncCopyMove) pairChecker(in fs.ObjectPairChan, out fs.ObjectPairChan, wg *sync.WaitGroup) {
func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, wg *sync.WaitGroup) {
defer wg.Done()
for {
if s.aborting() {
pair, ok := in.Get(s.ctx)
if !ok {
return
}
select {
case pair, ok := <-in:
if !ok {
return
}
src := pair.Src
accounting.Stats.Checking(src.Remote())
// Check to see if can store this
if src.Storable() {
if operations.NeedTransfer(pair.Dst, pair.Src) {
// If files are treated as immutable, fail if destination exists and does not match
if fs.Config.Immutable && pair.Dst != nil {
fs.Errorf(pair.Dst, "Source and destination exist but do not match: immutable file modified")
s.processError(fs.ErrorImmutableModified)
} else {
// If destination already exists, then we must move it into --backup-dir if required
if pair.Dst != nil && s.backupDir != nil {
remoteWithSuffix := pair.Dst.Remote() + s.suffix
overwritten, _ := s.backupDir.NewObject(remoteWithSuffix)
_, err := operations.Move(s.backupDir, overwritten, remoteWithSuffix, pair.Dst)
if err != nil {
s.processError(err)
} else {
// If successful zero out the dst as it is no longer there and copy the file
pair.Dst = nil
select {
case <-s.ctx.Done():
return
case out <- pair:
}
}
src := pair.Src
accounting.Stats.Checking(src.Remote())
// Check to see if can store this
if src.Storable() {
if operations.NeedTransfer(pair.Dst, pair.Src) {
// If files are treated as immutable, fail if destination exists and does not match
if fs.Config.Immutable && pair.Dst != nil {
fs.Errorf(pair.Dst, "Source and destination exist but do not match: immutable file modified")
s.processError(fs.ErrorImmutableModified)
} else {
// If destination already exists, then we must move it into --backup-dir if required
if pair.Dst != nil && s.backupDir != nil {
remoteWithSuffix := pair.Dst.Remote() + s.suffix
overwritten, _ := s.backupDir.NewObject(remoteWithSuffix)
_, err := operations.Move(s.backupDir, overwritten, remoteWithSuffix, pair.Dst)
if err != nil {
s.processError(err)
} else {
select {
case <-s.ctx.Done():
// If successful zero out the dst as it is no longer there and copy the file
pair.Dst = nil
ok = out.Put(s.ctx, pair)
if !ok {
return
case out <- pair:
}
}
}
} else {
// If moving need to delete the files we don't need to copy
if s.DoMove {
// Delete src if no error on copy
s.processError(operations.DeleteFile(src))
} else {
ok = out.Put(s.ctx, pair)
if !ok {
return
}
}
}
} else {
// If moving need to delete the files we don't need to copy
if s.DoMove {
// Delete src if no error on copy
s.processError(operations.DeleteFile(src))
}
}
accounting.Stats.DoneChecking(src.Remote())
case <-s.ctx.Done():
return
}
accounting.Stats.DoneChecking(src.Remote())
}
}
// pairRenamer reads Objects~s on in and attempts to rename them,
// otherwise it sends them out if they need transferring.
func (s *syncCopyMove) pairRenamer(in fs.ObjectPairChan, out fs.ObjectPairChan, wg *sync.WaitGroup) {
func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, wg *sync.WaitGroup) {
defer wg.Done()
for {
if s.aborting() {
pair, ok := in.Get(s.ctx)
if !ok {
return
}
select {
case pair, ok := <-in:
src := pair.Src
if !s.tryRename(src) {
// pass on if not renamed
ok = out.Put(s.ctx, pair)
if !ok {
return
}
src := pair.Src
if !s.tryRename(src) {
// pass on if not renamed
select {
case <-s.ctx.Done():
return
case out <- pair:
}
}
case <-s.ctx.Done():
return
}
}
}
// pairCopyOrMove reads Objects on in and moves or copies them.
func (s *syncCopyMove) pairCopyOrMove(in fs.ObjectPairChan, fdst fs.Fs, wg *sync.WaitGroup) {
func (s *syncCopyMove) pairCopyOrMove(in *pipe, fdst fs.Fs, wg *sync.WaitGroup) {
defer wg.Done()
var err error
for {
if s.aborting() {
pair, ok := in.Get(s.ctx)
if !ok {
return
}
select {
case pair, ok := <-in:
if !ok {
return
}
src := pair.Src
accounting.Stats.Transferring(src.Remote())
if s.DoMove {
_, err = operations.Move(fdst, pair.Dst, src.Remote(), src)
} else {
_, err = operations.Copy(fdst, pair.Dst, src.Remote(), src)
}
s.processError(err)
accounting.Stats.DoneTransferring(src.Remote(), err == nil)
case <-s.ctx.Done():
return
src := pair.Src
accounting.Stats.Transferring(src.Remote())
if s.DoMove {
_, err = operations.Move(fdst, pair.Dst, src.Remote(), src)
} else {
_, err = operations.Copy(fdst, pair.Dst, src.Remote(), src)
}
s.processError(err)
accounting.Stats.DoneTransferring(src.Remote(), err == nil)
}
}
@ -323,7 +294,7 @@ func (s *syncCopyMove) startCheckers() {
// This stops the background checkers
func (s *syncCopyMove) stopCheckers() {
close(s.toBeChecked)
s.toBeChecked.Close()
fs.Infof(s.fdst, "Waiting for checks to finish")
s.checkerWg.Wait()
}
@ -338,7 +309,7 @@ func (s *syncCopyMove) startTransfers() {
// This stops the background transfers
func (s *syncCopyMove) stopTransfers() {
close(s.toBeUploaded)
s.toBeUploaded.Close()
fs.Infof(s.fdst, "Waiting for transfers to finish")
s.transfersWg.Wait()
}
@ -359,7 +330,7 @@ func (s *syncCopyMove) stopRenamers() {
if !s.trackRenames {
return
}
close(s.toBeRenamed)
s.toBeRenamed.Close()
fs.Infof(s.fdst, "Waiting for renames to finish")
s.renamerWg.Wait()
}
@ -685,10 +656,9 @@ func (s *syncCopyMove) run() error {
s.makeRenameMap()
// Attempt renames for all the files which don't have a matching dst
for _, src := range s.renameCheck {
select {
case <-s.ctx.Done():
ok := s.toBeRenamed.Put(s.ctx, fs.ObjectPair{Src: src, Dst: nil})
if !ok {
break
case s.toBeRenamed <- fs.ObjectPair{Src: src, Dst: nil}:
}
}
}
@ -792,10 +762,9 @@ func (s *syncCopyMove) SrcOnly(src fs.DirEntry) (recurse bool) {
}
} else {
// No need to check since doesn't exist
select {
case <-s.ctx.Done():
ok := s.toBeUploaded.Put(s.ctx, fs.ObjectPair{Src: x, Dst: nil})
if !ok {
return
case s.toBeUploaded <- fs.ObjectPair{Src: x, Dst: nil}:
}
}
case fs.Directory:
@ -825,10 +794,9 @@ func (s *syncCopyMove) Match(dst, src fs.DirEntry) (recurse bool) {
}
dstX, ok := dst.(fs.Object)
if ok {
select {
case <-s.ctx.Done():
return
case s.toBeChecked <- fs.ObjectPair{Src: srcX, Dst: dstX}:
ok = s.toBeChecked.Put(s.ctx, fs.ObjectPair{Src: srcX, Dst: dstX})
if !ok {
return false
}
} else {
// FIXME src is file, dst is directory