From 03f754f19ba0e74f5876037e1aead2d0a6834da7 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 30 Jan 2025 10:23:30 +0300 Subject: [PATCH 1/2] [#1621] treesvc: Move variable initialization to top in mergeOperationStreams() Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/sync.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index c48a312fb..4f3262113 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -134,11 +134,6 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { defer close(merged) - ms := make([]*pilorama.Move, len(streams)) - for i := range streams { - ms[i] = <-streams[i] - } - // Merging different node streams shuffles incoming operations like that: // // x - operation from the stream A @@ -150,6 +145,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram // operation height from the stream B. This height is stored in minStreamedLastHeight. var minStreamedLastHeight uint64 = math.MaxUint64 + ms := make([]*pilorama.Move, len(streams)) + for i := range streams { + ms[i] = <-streams[i] + } + for { var minTimeMoveTime uint64 = math.MaxUint64 minTimeMoveIndex := -1 -- 2.45.3 From 1580188e59fd7aa9f48cbdb75f13116f640dacf9 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 30 Jan 2025 10:36:02 +0300 Subject: [PATCH 2/2] [#1621] treesvc: Cancel background sync on failure If applyOperationStream() exits prematurely, other goroutines will block on send and errgroup will never finish waiting. In this commit we also check whether context is cancelled. Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/sync.go | 35 +++++++++++++++++++++++----------- pkg/services/tree/sync_test.go | 3 ++- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 4f3262113..1a455def9 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -131,7 +131,7 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string } // mergeOperationStreams performs merge sort for node operation streams to one stream. -func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { +func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { defer close(merged) // Merging different node streams shuffles incoming operations like that: @@ -147,7 +147,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram ms := make([]*pilorama.Move, len(streams)) for i := range streams { - ms[i] = <-streams[i] + select { + case ms[i] = <-streams[i]: + case <-ctx.Done(): + return minStreamedLastHeight + } } for { @@ -164,7 +168,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram break } - merged <- ms[minTimeMoveIndex] + select { + case merged <- ms[minTimeMoveIndex]: + case <-ctx.Done(): + return minStreamedLastHeight + } height := ms[minTimeMoveIndex].Time if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil { minStreamedLastHeight = min(minStreamedLastHeight, height) @@ -176,7 +184,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move, -) uint64 { +) (uint64, error) { var prev *pilorama.Move var batch []*pilorama.Move for m := range operationStream { @@ -189,17 +197,17 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s if len(batch) == s.syncBatchSize { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { - return batch[0].Time + return batch[0].Time, err } batch = batch[:0] } } if len(batch) > 0 { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { - return batch[0].Time + return batch[0].Time, err } } - return math.MaxUint64 + return math.MaxUint64, nil } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, @@ -235,7 +243,11 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, if err := m.Meta.FromBytes(lm.GetMeta()); err != nil { return err } - opsCh <- m + select { + case opsCh <- m: + case <-ctx.Done(): + return ctx.Err() + } } if !errors.Is(err, io.EOF) { return err @@ -264,13 +276,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, merged := make(chan *pilorama.Move) var minStreamedLastHeight uint64 errGroup.Go(func() error { - minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged) + minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged) return nil }) var minUnappliedHeight uint64 errGroup.Go(func() error { - minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged) - return nil + var err error + minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged) + return err }) var allNodesSynced atomic.Bool diff --git a/pkg/services/tree/sync_test.go b/pkg/services/tree/sync_test.go index 497d90554..87d419408 100644 --- a/pkg/services/tree/sync_test.go +++ b/pkg/services/tree/sync_test.go @@ -1,6 +1,7 @@ package tree import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" @@ -64,7 +65,7 @@ func Test_mergeOperationStreams(t *testing.T) { merged := make(chan *pilorama.Move, 1) min := make(chan uint64) go func() { - min <- mergeOperationStreams(nodeOpChans, merged) + min <- mergeOperationStreams(context.Background(), nodeOpChans, merged) }() var res []uint64 -- 2.45.3