[#166] node: Parallelize background tree service sync

* Run sync task for nodes in parallel within errgroup worker pool

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
Airat Arifullin 2023-04-05 14:56:15 +03:00 committed by Gitea
parent 9027695371
commit 56282edf02

View file

@ -17,6 +17,7 @@ import (
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
) )
@ -129,8 +130,15 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
zap.String("tree", treeID), zap.String("tree", treeID),
zap.Uint64("from", from)) zap.Uint64("from", from))
newHeight := uint64(math.MaxUint64) errGroup, egCtx := errgroup.WithContext(ctx)
for _, n := range nodes { const workersCount = 4
errGroup.SetLimit(workersCount)
heights := make([]uint64, len(nodes))
for i, n := range nodes {
i := i
n := n
errGroup.Go(func() error {
height := from height := from
n.IterateNetworkEndpoints(func(addr string) bool { n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address var a network.Address
@ -138,7 +146,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return false return false
} }
cc, err := grpc.DialContext(ctx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials())) cc, err := grpc.DialContext(egCtx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
// Failed to connect, try the next address. // Failed to connect, try the next address.
return false return false
@ -147,7 +155,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeClient := NewTreeServiceClient(cc) treeClient := NewTreeServiceClient(cc)
for { for {
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient) h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
if height < h { if height < h {
height = h height = h
} }
@ -157,9 +165,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
} }
} }
}) })
if height <= from { // do not increase starting height on fail if height <= from { // do not increase starting height on fail
newHeight = from heights[i] = from
} else if height < newHeight { // take minimum across all clients return nil
}
heights[i] = height
return nil
})
}
if err := errGroup.Wait(); err != nil {
s.log.Warn("failed to run tree synchronization over all nodes", zap.Error(err))
}
newHeight := uint64(math.MaxUint64)
for _, height := range heights { // take minimum across all clients
if height < newHeight {
newHeight = height newHeight = height
} }
} }