forked from TrueCloudLab/frostfs-node
[#166] node: Parallelize background tree service sync
* Run sync task for nodes in parallel within errgroup worker pool Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
parent
a69c6d1ec9
commit
8b2885f242
1 changed files with 47 additions and 25 deletions
|
@ -18,6 +18,7 @@ import (
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
)
|
)
|
||||||
|
@ -130,8 +131,15 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint6
|
||||||
zap.String("tree", treeID),
|
zap.String("tree", treeID),
|
||||||
zap.Uint64("from", from))
|
zap.Uint64("from", from))
|
||||||
|
|
||||||
newHeight := uint64(math.MaxUint64)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
for _, n := range nodes {
|
const workersCount = 4
|
||||||
|
errGroup.SetLimit(workersCount)
|
||||||
|
|
||||||
|
heights := make([]uint64, len(nodes))
|
||||||
|
for i, n := range nodes {
|
||||||
|
i := i
|
||||||
|
n := n
|
||||||
|
errGroup.Go(func() error {
|
||||||
height := from
|
height := from
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||||
var a network.Address
|
var a network.Address
|
||||||
|
@ -139,7 +147,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint6
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := grpc.DialContext(ctx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
cc, err := grpc.DialContext(egCtx, a.URIAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Failed to connect, try the next address.
|
// Failed to connect, try the next address.
|
||||||
return false
|
return false
|
||||||
|
@ -148,7 +156,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint6
|
||||||
|
|
||||||
treeClient := NewTreeServiceClient(cc)
|
treeClient := NewTreeServiceClient(cc)
|
||||||
for {
|
for {
|
||||||
h, err := s.synchronizeSingle(ctx, cid, treeID, height, treeClient)
|
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
|
||||||
if height < h {
|
if height < h {
|
||||||
height = h
|
height = h
|
||||||
}
|
}
|
||||||
|
@ -158,9 +166,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cidSDK.ID, from uint6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if height <= from { // do not increase starting height on fail
|
if height <= from { // do not increase starting height on fail
|
||||||
newHeight = from
|
heights[i] = from
|
||||||
} else if height < newHeight { // take minimum across all clients
|
return nil
|
||||||
|
}
|
||||||
|
heights[i] = height
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
s.log.Warn("failed to run tree synchronization over all nodes", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
newHeight := uint64(math.MaxUint64)
|
||||||
|
for _, height := range heights { // take minimum across all clients
|
||||||
|
if height < newHeight {
|
||||||
newHeight = height
|
newHeight = height
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue