From c299b98afea3cc1314c69210fe5b842e9e315873 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 12 Dec 2022 14:49:40 +0300 Subject: [PATCH] [#2165] services/tree: Parallelize synchronization Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/service.go | 4 ++++ pkg/services/tree/sync.go | 32 ++++++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 941a80d8f..0368e7586 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -11,6 +11,7 @@ import ( "github.com/TrueCloudLab/frostfs-sdk-go/container/acl" cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -26,6 +27,7 @@ type Service struct { containerCache containerCache syncChan chan struct{} + syncPool *ants.Pool cnrMap map[cidSDK.ID]struct{} } @@ -54,6 +56,7 @@ func New(opts ...Option) *Service { s.containerCache.init(s.containerCacheSize) s.cnrMap = make(map[cidSDK.ID]struct{}) s.syncChan = make(chan struct{}) + s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount) return &s } @@ -75,6 +78,7 @@ func (s *Service) Start(ctx context.Context) { // Shutdown shutdowns the service. func (s *Service) Shutdown() { close(s.closeCh) + s.syncPool.Release() } func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index bff4db7ef..a833d3852 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -7,12 +7,14 @@ import ( "fmt" "io" "math/rand" + "sync" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "github.com/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "github.com/TrueCloudLab/frostfs-node/pkg/network" cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "github.com/TrueCloudLab/frostfs-sdk-go/netmap" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -22,6 +24,8 @@ import ( // because the node is not included in the container. var ErrNotInContainer = errors.New("node is not in container") +const defaultSyncWorkerCount = 20 + // SynchronizeAllTrees synchronizes all the trees of the container. It fetches // tree IDs from the other container nodes. Returns ErrNotInContainer if the node // is not included in the container. @@ -271,17 +275,33 @@ func (s *Service) syncLoop(ctx context.Context) { } // sync new containers + var wg sync.WaitGroup for _, cnr := range cnrsToSync { - s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) + wg.Add(1) + cnr := cnr + err := s.syncPool.Submit(func() { + defer wg.Done() + s.log.Debug("syncing container trees...", zap.Stringer("cid", cnr)) - err = s.SynchronizeAllTrees(ctx, cnr) + err := s.SynchronizeAllTrees(ctx, cnr) + if err != nil { + s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) + return + } + + s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) + }) if err != nil { - s.log.Error("could not sync trees", zap.Stringer("cid", cnr), zap.Error(err)) - continue + wg.Done() + s.log.Error("could not query trees for synchronization", + zap.Stringer("cid", cnr), + zap.Error(err)) + if errors.Is(err, ants.ErrPoolClosed) { + return + } } - - s.log.Debug("container trees have been synced", zap.Stringer("cid", cnr)) } + wg.Wait() // remove stored redundant trees for cnr := range s.cnrMap {