From b8a3f177598c7e5c5df4cb4e439bf443e475dda6 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Nov 2023 10:12:39 +0300 Subject: [PATCH 1/3] [#741] treesvc: Remove unused height variables Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + pkg/services/tree/sync.go | 80 ++++++++++++++++++--------------------- 2 files changed, 37 insertions(+), 44 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index e8472357c..fad5f464d 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -64,6 +64,7 @@ const ( TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree" TreeSynchronizeTree = "synchronize tree" TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes" + TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node" TreeSyncingTrees = "syncing trees..." TreeCouldNotFetchContainers = "could not fetch containers" TreeTreesHaveBeenSynchronized = "trees have been synchronized" diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index f5a7fbce6..6dc3ffb29 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -217,44 +217,41 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move, -) (uint64, error) { +) error { rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) - for { - newHeight := height - req := &GetOpLogRequest{ - Body: &GetOpLogRequest_Body{ - ContainerId: rawCID, - TreeId: treeID, - Height: newHeight, - }, - } - if err := SignMessage(req, s.key); err != nil { - return 0, err - } - - c, err := treeClient.GetOpLog(ctx, req) - if err != nil { - return 0, fmt.Errorf("can't initialize client: %w", err) - } - res, err := c.Recv() - for ; err == nil; res, err = c.Recv() { - lm := res.GetBody().GetOperation() - m := &pilorama.Move{ - Parent: lm.ParentId, - Child: lm.ChildId, - } - if err := m.Meta.FromBytes(lm.Meta); err != nil { - return 0, err - } - opsCh <- m - } - if height == newHeight || err != nil && !errors.Is(err, io.EOF) { - return newHeight, err - } - height = newHeight + req := &GetOpLogRequest{ + Body: &GetOpLogRequest_Body{ + ContainerId: rawCID, + TreeId: treeID, + Height: height, + }, } + if err := SignMessage(req, s.key); err != nil { + return err + } + + c, err := treeClient.GetOpLog(ctx, req) + if err != nil { + return fmt.Errorf("can't initialize client: %w", err) + } + res, err := c.Recv() + for ; err == nil; res, err = c.Recv() { + lm := res.GetBody().GetOperation() + m := &pilorama.Move{ + Parent: lm.ParentId, + Child: lm.ChildId, + } + if err := m.Meta.FromBytes(lm.Meta); err != nil { + return err + } + opsCh <- m + } + if err != nil && !errors.Is(err, io.EOF) { + return err + } + return nil } // synchronizeTree synchronizes operations getting them from different nodes. @@ -291,7 +288,6 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, i := i n := n errGroup.Go(func() error { - height := from n.IterateNetworkEndpoints(func(addr string) bool { var a network.Address if err := a.FromString(addr); err != nil { @@ -315,16 +311,12 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, defer cc.Close() treeClient := NewTreeServiceClient(cc) - for { - h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) - if height < h { - height = h - } - if err != nil || h <= height { - // Error with the response, try the next node. - return true - } + err = s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) + if err != nil { + // Error with the response, try the next node. + s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr)) } + return true }) close(nodeOperationStreams[i]) return nil -- 2.45.2 From 1c735efedd363738f2a8e3a5526404ae0d559f3a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Nov 2023 10:51:15 +0300 Subject: [PATCH 2/3] [#741] treesvc: Do not update sync height if some node is unavailable Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/sync.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 6dc3ffb29..65920fdc0 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -9,6 +9,7 @@ import ( "math" "math/rand" "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -284,10 +285,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, return nil }) + var allNodesSynced atomic.Bool + allNodesSynced.Store(true) + for i, n := range nodes { i := i n := n errGroup.Go(func() error { + var nodeSynced bool n.IterateNetworkEndpoints(func(addr string) bool { var a network.Address if err := a.FromString(addr); err != nil { @@ -316,13 +321,18 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, // Error with the response, try the next node. s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr)) } + nodeSynced = err == nil return true }) close(nodeOperationStreams[i]) + if !nodeSynced { + allNodesSynced.Store(false) + } return nil }) } if err := errGroup.Wait(); err != nil { + allNodesSynced.Store(false) s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err)) } @@ -332,7 +342,10 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, } else { newHeight++ } - return newHeight + if allNodesSynced.Load() { + return newHeight + } + return from } // ErrAlreadySyncing is returned when a service synchronization has already -- 2.45.2 From b20298007ed7d3a36f7303466bcb123f631e25e9 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 28 Nov 2023 17:04:02 +0300 Subject: [PATCH 3/3] [#741] treesvc: Refactor tree sync Fix linter issues. Add error logging. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 ++ pkg/services/tree/sync.go | 35 ++++++++++++++++++++--------------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index fad5f464d..a72609b41 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -65,6 +65,8 @@ const ( TreeSynchronizeTree = "synchronize tree" TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes" TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node" + TreeFailedToParseAddressForTreeSynchronization = "failed to parse address for tree synchronization" + TreeFailedToConnectForTreeSynchronization = "failed to connect for tree synchronization" TreeSyncingTrees = "syncing trees..." TreeCouldNotFetchContainers = "could not fetch containers" TreeTreesHaveBeenSynchronized = "trees have been synchronized" diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 65920fdc0..485278adf 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -217,8 +217,10 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, - height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move, + height uint64, cc *grpc.ClientConn, opsCh chan<- *pilorama.Move, ) error { + treeClient := NewTreeServiceClient(cc) + rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) @@ -296,29 +298,19 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, n.IterateNetworkEndpoints(func(addr string) bool { var a network.Address if err := a.FromString(addr); err != nil { + s.log.Warn(logs.TreeFailedToParseAddressForTreeSynchronization, zap.Error(err), zap.String("address", addr)) return false } - cc, err := grpc.DialContext(egCtx, a.URIAddr(), - grpc.WithChainUnaryInterceptor( - metrics.NewUnaryClientInterceptor(), - tracing_grpc.NewUnaryClientInteceptor(), - ), - grpc.WithChainStreamInterceptor( - metrics.NewStreamClientInterceptor(), - tracing_grpc.NewStreamClientInterceptor(), - ), - grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := s.dialCtx(egCtx, a) if err != nil { - // Failed to connect, try the next address. + s.log.Warn(logs.TreeFailedToConnectForTreeSynchronization, zap.Error(err), zap.String("address", addr)) return false } defer cc.Close() - treeClient := NewTreeServiceClient(cc) - err = s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) + err = s.startStream(egCtx, cid, treeID, from, cc, nodeOperationStreams[i]) if err != nil { - // Error with the response, try the next node. s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr)) } nodeSynced = err == nil @@ -348,6 +340,19 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, return from } +func (*Service) dialCtx(egCtx context.Context, a network.Address) (*grpc.ClientConn, error) { + return grpc.DialContext(egCtx, a.URIAddr(), + grpc.WithChainUnaryInterceptor( + metrics.NewUnaryClientInterceptor(), + tracing_grpc.NewUnaryClientInteceptor(), + ), + grpc.WithChainStreamInterceptor( + metrics.NewStreamClientInterceptor(), + tracing_grpc.NewStreamClientInterceptor(), + ), + grpc.WithTransportCredentials(insecure.NewCredentials())) +} + // ErrAlreadySyncing is returned when a service synchronization has already // been started. var ErrAlreadySyncing = errors.New("service is being synchronized") -- 2.45.2