Fix tree service sync #829

Merged
fyrchik merged 3 commits from dstepanov-yadro/frostfs-node:fix/tree_service_from into master 2023-11-30 12:45:04 +00:00
2 changed files with 71 additions and 58 deletions

View file

@ -64,6 +64,9 @@ const (
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree" TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
TreeSynchronizeTree = "synchronize tree" TreeSynchronizeTree = "synchronize tree"
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes" TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization"
TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization"
TreeSyncingTrees = "syncing trees..." TreeSyncingTrees = "syncing trees..."
TreeCouldNotFetchContainers = "could not fetch containers" TreeCouldNotFetchContainers = "could not fetch containers"
TreeTreesHaveBeenSynchronized = "trees have been synchronized" TreeTreesHaveBeenSynchronized = "trees have been synchronized"

View file

@ -9,6 +9,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -216,27 +217,27 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
} }
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move, height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move,
) (uint64, error) { ) error {
treeClient := NewTreeServiceClient(cc)
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID) cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{ req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{ Body: &GetOpLogRequest_Body{
ContainerId: rawCID, ContainerId: rawCID,
TreeId: treeID, TreeId: treeID,
Height: newHeight, Height: height,
}, },
} }
if err := SignMessage(req, s.key); err != nil { if err := SignMessage(req, s.key); err != nil {
return 0, err return err
} }
c, err := treeClient.GetOpLog(ctx, req) c, err := treeClient.GetOpLog(ctx, req)
if err != nil { if err != nil {
return 0, fmt.Errorf("can't initialize client: %w", err) return fmt.Errorf("can't initialize client: %w", err)
} }
res, err := c.Recv() res, err := c.Recv()
for ; err == nil; res, err = c.Recv() { for ; err == nil; res, err = c.Recv() {
@ -246,15 +247,14 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
Child: lm.ChildId, Child: lm.ChildId,
} }
if err := m.Meta.FromBytes(lm.Meta); err != nil { if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err return err
} }
opsCh <- m opsCh <- m
} }
if height == newHeight || err != nil && !errors.Is(err, io.EOF) { if err != nil && !errors.Is(err, io.EOF) {
return newHeight, err return err
}
height = newHeight
} }
return nil
} }
// synchronizeTree synchronizes operations getting them from different nodes. // synchronizeTree synchronizes operations getting them from different nodes.
@ -287,50 +287,44 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
return nil return nil
}) })
var allNodesSynced atomic.Bool
allNodesSynced.Store(true)
for i, n := range nodes { for i, n := range nodes {
i := i i := i
n := n n := n
errGroup.Go(func() error { errGroup.Go(func() error {
height := from var nodeSynced bool
n.IterateNetworkEndpoints(func(addr string) bool { n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address var a network.Address
if err := a.FromString(addr); err != nil { if err := a.FromString(addr); err != nil {
s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false return false
} }
cc, err := grpc.DialContext(egCtx, a.URIAddr(), cc, err := s.dialCtx(egCtx, a)
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
// Failed to connect, try the next address. s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr))
return false return false
} }
defer cc.Close() defer cc.Close()
treeClient := NewTreeServiceClient(cc) err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i])
for { if err != nil {
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
if height < h {
height = h
} }
if err != nil || h <= height { nodeSynced = err == nil
// Error with the response, try the next node.
return true return true
}
}
}) })
close(nodeOperationStreams[i]) close(nodeOperationStreams[i])
if !nodeSynced {
allNodesSynced.Store(false)
}
return nil return nil
}) })
} }
if err := errGroup.Wait(); err != nil { if err := errGroup.Wait(); err != nil {
allNodesSynced.Store(false)
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err)) s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
} }
@ -340,8 +334,24 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
} else { } else {
newHeight++ newHeight++
} }
if allNodesSynced.Load() {
return newHeight return newHeight
} }
return from
}
func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) {
return grpc.DialContext(egCtx, a.URIAddr(),
grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(),
tracing_grpc.NewUnaryClientInteceptor(),
),
grpc.WithChainStreamInterceptor(
metrics.NewStreamClientInterceptor(),
tracing_grpc.NewStreamClientInterceptor(),
),
grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// ErrAlreadySyncing is returned when a service synchronization has already // ErrAlreadySyncing is returned when a service synchronization has already
// been started. // been started.