From 56282edf02ba3494c922fa6133e2671b3f8e59e2 Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Wed, 5 Apr 2023 14:56:15 +0300 Subject: [PATCH] [#166] node: Parallelize background tree service sync * Run sync task for nodes in parallel within errgroup worker pool Signed-off-by: Airat Arifullin a.arifullin@yadro.com --- pkg/services/tree/sync.go | 72 +++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 2d5c104b0..32d088c01 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -17,6 +17,7 @@ import ( netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -129,37 +130,58 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, zap.String("tree", treeID), zap.Uint64("from", from)) - newHeight := uint64(math.MaxUint64) - for _, n := range nodes { - height := from - n.IterateNetworkEndpoints(func(addr string) bool { - var a network.Address - if err := a.FromString(addr); err != nil { - return false - } + errGroup, egCtx := errgroup.WithContext(ctx) + const workersCount = 4 + errGroup.SetLimit(workersCount) - cc, err := grpc.DialContext(ctx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - // Failed to connect, try the next address. - return false - } - defer cc.Close() + heights := make([]uint64, len(nodes)) + for i, n := range nodes { + i := i + n := n + errGroup.Go(func() error { + height := from + n.IterateNetworkEndpoints(func(addr string) bool { + var a network.Address + if err := a.FromString(addr); err != nil { + return false + } - treeClient := NewTreeServiceClient(cc) - for { - h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient) - if height < h { - height = h + cc, err := grpc.DialContext(egCtx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + // Failed to connect, try the next address. + return false } - if err != nil || h <= height { - // Error with the response, try the next node. - return true + defer cc.Close() + + treeClient := NewTreeServiceClient(cc) + for { + h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient) + if height < h { + height = h + } + if err != nil || h <= height { + // Error with the response, try the next node. + return true + } } + }) + + if height <= from { // do not increase starting height on fail + heights[i] = from + return nil } + heights[i] = height + return nil }) - if height <= from { // do not increase starting height on fail - newHeight = from - } else if height < newHeight { // take minimum across all clients + } + + if err := errGroup.Wait(); err != nil { + s.log.Warn("failed to run tree synchronization over all nodes", zap.Error(err)) + } + + newHeight := uint64(math.MaxUint64) + for _, height := range heights { // take minimum across all clients + if height < newHeight { newHeight = height } }