parallelize more List usages

This commit is contained in:
Michael Eischer 2022-10-15 17:25:45 +02:00
parent ae45f3b04f
commit 738b2a0445
3 changed files with 15 additions and 6 deletions
cmd/restic
internal/restic

View file

@ -13,6 +13,7 @@ import (
"os" "os"
"runtime" "runtime"
"sort" "sort"
"sync"
"time" "time"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
@ -105,7 +106,8 @@ type Blob struct {
func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer) error { func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer) error {
return repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { var m sync.Mutex
return restic.ParallelList(ctx, repo.Backend(), restic.PackFile, repo.Connections(), func(ctx context.Context, id restic.ID, size int64) error {
blobs, _, err := repo.ListPack(ctx, id, size) blobs, _, err := repo.ListPack(ctx, id, size)
if err != nil { if err != nil {
Warnf("error for pack %v: %v\n", id.Str(), err) Warnf("error for pack %v: %v\n", id.Str(), err)
@ -125,6 +127,8 @@ func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer)
} }
} }
m.Lock()
defer m.Unlock()
return prettyPrintJSON(wr, p) return prettyPrintJSON(wr, p)
}) })
} }

View file

@ -6,6 +6,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"strings" "strings"
"sync"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
@ -56,9 +57,10 @@ func listKeys(ctx context.Context, s *repository.Repository, gopts GlobalOptions
Created string `json:"created"` Created string `json:"created"`
} }
var m sync.Mutex
var keys []keyInfo var keys []keyInfo
err := s.List(ctx, restic.KeyFile, func(id restic.ID, size int64) error { err := restic.ParallelList(ctx, s.Backend(), restic.KeyFile, s.Connections(), func(ctx context.Context, id restic.ID, size int64) error {
k, err := repository.LoadKey(ctx, s, id) k, err := repository.LoadKey(ctx, s, id)
if err != nil { if err != nil {
Warnf("LoadKey() failed: %v\n", err) Warnf("LoadKey() failed: %v\n", err)
@ -73,6 +75,8 @@ func listKeys(ctx context.Context, s *repository.Repository, gopts GlobalOptions
Created: k.Created.Local().Format(TimeFormat), Created: k.Created.Local().Format(TimeFormat),
} }
m.Lock()
defer m.Unlock()
keys = append(keys, key) keys = append(keys, key)
return nil return nil
}) })

View file

@ -7,6 +7,7 @@ import (
"os/signal" "os/signal"
"os/user" "os/user"
"sync" "sync"
"sync/atomic"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -301,15 +302,15 @@ func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
// RemoveAllLocks removes all locks forcefully. // RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) { func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
var processed uint var processed uint32
err := repo.List(ctx, LockFile, func(id ID, size int64) error { err := ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
err := repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()}) err := repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
if err == nil { if err == nil {
processed++ atomic.AddUint32(&processed, 1)
} }
return err return err
}) })
return processed, err return uint(processed), err
} }
// ForAllLocks reads all locks in parallel and calls the given callback. // ForAllLocks reads all locks in parallel and calls the given callback.