forked from TrueCloudLab/rclone
sync,operations: fix correct concurrency: use --checkers unless transferring files
There were some places (e.g. deleting files) where we were using --transfers instead of --checkers to control the concurrency when files weren't being transferred. These have been updated to use --checkers.
This commit is contained in:
parent
019a486d5b
commit
a5390dbbeb
2 changed files with 14 additions and 9 deletions
|
@ -678,11 +678,11 @@ func DeleteFile(ctx context.Context, dst fs.Object) (err error) {
|
||||||
func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, backupDir fs.Fs) error {
|
func DeleteFilesWithBackupDir(ctx context.Context, toBeDeleted fs.ObjectsChan, backupDir fs.Fs) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
wg.Add(ci.Transfers)
|
wg.Add(ci.Checkers)
|
||||||
var errorCount int32
|
var errorCount int32
|
||||||
var fatalErrorCount int32
|
var fatalErrorCount int32
|
||||||
|
|
||||||
for i := 0; i < ci.Transfers; i++ {
|
for i := 0; i < ci.Checkers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for dst := range toBeDeleted {
|
for dst := range toBeDeleted {
|
||||||
|
@ -1022,7 +1022,12 @@ func hashSum(ctx context.Context, ht hash.Type, base64Encoded bool, downloadFlag
|
||||||
// Updated to perform multiple hashes concurrently
|
// Updated to perform multiple hashes concurrently
|
||||||
func HashLister(ctx context.Context, ht hash.Type, outputBase64 bool, downloadFlag bool, f fs.Fs, w io.Writer) error {
|
func HashLister(ctx context.Context, ht hash.Type, outputBase64 bool, downloadFlag bool, f fs.Fs, w io.Writer) error {
|
||||||
width := hash.Width(ht, outputBase64)
|
width := hash.Width(ht, outputBase64)
|
||||||
concurrencyControl := make(chan struct{}, fs.GetConfig(ctx).Transfers)
|
// Use --checkers concurrency unless downloading in which case use --transfers
|
||||||
|
concurrency := fs.GetConfig(ctx).Checkers
|
||||||
|
if downloadFlag {
|
||||||
|
concurrency = fs.GetConfig(ctx).Transfers
|
||||||
|
}
|
||||||
|
concurrencyControl := make(chan struct{}, concurrency)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
err := ListFn(ctx, f, func(o fs.Object) {
|
err := ListFn(ctx, f, func(o fs.Object) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -1173,7 +1178,7 @@ func Purge(ctx context.Context, f fs.Fs, dir string) (err error) {
|
||||||
// obeys includes and excludes.
|
// obeys includes and excludes.
|
||||||
func Delete(ctx context.Context, f fs.Fs) error {
|
func Delete(ctx context.Context, f fs.Fs) error {
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
delChan := make(fs.ObjectsChan, ci.Transfers)
|
delChan := make(fs.ObjectsChan, ci.Checkers)
|
||||||
delErr := make(chan error, 1)
|
delErr := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
delErr <- DeleteFiles(ctx, delChan)
|
delErr <- DeleteFiles(ctx, delChan)
|
||||||
|
@ -2187,9 +2192,9 @@ func DirMove(ctx context.Context, f fs.Fs, srcRemote, dstRemote string) (err err
|
||||||
o fs.Object
|
o fs.Object
|
||||||
newPath string
|
newPath string
|
||||||
}
|
}
|
||||||
renames := make(chan rename, ci.Transfers)
|
renames := make(chan rename, ci.Checkers)
|
||||||
g, gCtx := errgroup.WithContext(context.Background())
|
g, gCtx := errgroup.WithContext(context.Background())
|
||||||
for i := 0; i < ci.Transfers; i++ {
|
for i := 0; i < ci.Checkers; i++ {
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
for job := range renames {
|
for job := range renames {
|
||||||
dstOverwritten, _ := f.NewObject(gCtx, job.newPath)
|
dstOverwritten, _ := f.NewObject(gCtx, job.newPath)
|
||||||
|
|
|
@ -537,7 +537,7 @@ func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the spare files
|
// Delete the spare files
|
||||||
toDelete := make(fs.ObjectsChan, s.ci.Transfers)
|
toDelete := make(fs.ObjectsChan, s.ci.Checkers)
|
||||||
go func() {
|
go func() {
|
||||||
outer:
|
outer:
|
||||||
for remote, o := range s.dstFiles {
|
for remote, o := range s.dstFiles {
|
||||||
|
@ -772,8 +772,8 @@ func (s *syncCopyMove) makeRenameMap() {
|
||||||
// now make a map of size,hash for all dstFiles
|
// now make a map of size,hash for all dstFiles
|
||||||
s.renameMap = make(map[string][]fs.Object)
|
s.renameMap = make(map[string][]fs.Object)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(s.ci.Transfers)
|
wg.Add(s.ci.Checkers)
|
||||||
for i := 0; i < s.ci.Transfers; i++ {
|
for i := 0; i < s.ci.Checkers; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for obj := range in {
|
for obj := range in {
|
||||||
|
|
Loading…
Add table
Reference in a new issue