Fix tree service sync #829
2 changed files with 71 additions and 58 deletions
|
@ -64,6 +64,9 @@ const (
|
||||||
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
|
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
|
||||||
TreeSynchronizeTree = "synchronize tree"
|
TreeSynchronizeTree = "synchronize tree"
|
||||||
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
|
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
|
||||||
|
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
|
||||||
|
TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization"
|
||||||
|
TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization"
|
||||||
TreeSyncingTrees = "syncing trees..."
|
TreeSyncingTrees = "syncing trees..."
|
||||||
TreeCouldNotFetchContainers = "could not fetch containers"
|
TreeCouldNotFetchContainers = "could not fetch containers"
|
||||||
TreeTreesHaveBeenSynchronized = "trees have been synchronized"
|
TreeTreesHaveBeenSynchronized = "trees have been synchronized"
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -216,27 +217,27 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
|
height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
|
||||||
) (uint64, error) {
|
) error {
|
||||||
|
treeClient := NewTreeServiceClient(cc)
|
||||||
|
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
cid.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
for {
|
|
||||||
newHeight := height
|
|
||||||
req := &GetOpLogRequest{
|
req := &GetOpLogRequest{
|
||||||
Body: &GetOpLogRequest_Body{
|
Body: &GetOpLogRequest_Body{
|
||||||
ContainerId: rawCID,
|
ContainerId: rawCID,
|
||||||
TreeId: treeID,
|
TreeId: treeID,
|
||||||
Height: newHeight,
|
Height: height,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("can't initialize client: %w", err)
|
return fmt.Errorf("can't initialize client: %w", err)
|
||||||
}
|
}
|
||||||
res, err := c.Recv()
|
res, err := c.Recv()
|
||||||
for ; err == nil; res, err = c.Recv() {
|
for ; err == nil; res, err = c.Recv() {
|
||||||
|
@ -246,15 +247,14 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
Child: lm.ChildId,
|
Child: lm.ChildId,
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
opsCh <- m
|
opsCh <- m
|
||||||
}
|
}
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
return newHeight, err
|
return err
|
||||||
}
|
|
||||||
height = newHeight
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||||
|
@ -287,50 +287,44 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var allNodesSynced atomic.Bool
|
||||||
|
allNodesSynced.Store(true)
|
||||||
|
|
||||||
for i, n := range nodes {
|
for i, n := range nodes {
|
||||||
i := i
|
i := i
|
||||||
n := n
|
n := n
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
height := from
|
var nodeSynced bool
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||||
var a network.Address
|
var a network.Address
|
||||||
if err := a.FromString(addr); err != nil {
|
if err := a.FromString(addr); err != nil {
|
||||||
|
s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cc, err := grpc.DialContext(egCtx, a.URIAddr(),
|
cc, err := s.dialCtx(egCtx, a)
|
||||||
grpc.WithChainUnaryInterceptor(
|
|
||||||
metrics.NewUnaryClientInterceptor(),
|
|
||||||
tracing_grpc.NewUnaryClientInteceptor(),
|
|
||||||
),
|
|
||||||
grpc.WithChainStreamInterceptor(
|
|
||||||
metrics.NewStreamClientInterceptor(),
|
|
||||||
tracing_grpc.NewStreamClientInterceptor(),
|
|
||||||
),
|
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Failed to connect, try the next address.
|
s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer cc.Close()
|
defer cc.Close()
|
||||||
|
|
||||||
treeClient := NewTreeServiceClient(cc)
|
err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
|
||||||
for {
|
if err != nil {
|
||||||
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
|
||||||
if height < h {
|
|
||||||
height = h
|
|
||||||
}
|
}
|
||||||
if err != nil || h <= height {
|
nodeSynced = err == nil
|
||||||
// Error with the response, try the next node.
|
|
||||||
return true
|
return true
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
close(nodeOperationStreams[i])
|
close(nodeOperationStreams[i])
|
||||||
|
if !nodeSynced {
|
||||||
|
allNodesSynced.Store(false)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if err := errGroup.Wait(); err != nil {
|
if err := errGroup.Wait(); err != nil {
|
||||||
|
allNodesSynced.Store(false)
|
||||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,8 +334,24 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
} else {
|
} else {
|
||||||
newHeight++
|
newHeight++
|
||||||
}
|
}
|
||||||
|
if allNodesSynced.Load() {
|
||||||
return newHeight
|
return newHeight
|
||||||
}
|
}
|
||||||
|
return from
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
|
||||||
|
return grpc.DialContext(egCtx, a.URIAddr(),
|
||||||
|
grpc.WithChainUnaryInterceptor(
|
||||||
|
metrics.NewUnaryClientInterceptor(),
|
||||||
|
tracing_grpc.NewUnaryClientInteceptor(),
|
||||||
|
),
|
||||||
|
grpc.WithChainStreamInterceptor(
|
||||||
|
metrics.NewStreamClientInterceptor(),
|
||||||
|
tracing_grpc.NewStreamClientInterceptor(),
|
||||||
|
),
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
}
|
||||||
|
|
||||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||||
// been started.
|
// been started.
|
||||||
|
|
Loading…
Reference in a new issue