From 89dd9660f71e21e6d3af730761eaf246ed6a7f0b Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 30 Jan 2025 10:36:02 +0300 Subject: [PATCH] [#1621] treesvc: Cancel background sync on failure If applyOperationStream() exits prematurely, other goroutines will block on send and errgroup will never finish waiting. In this commit we also check whether context is cancelled. Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/sync.go | 35 +++++++++++++++++++++++----------- pkg/services/tree/sync_test.go | 3 ++- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 4f3262113..1a455def9 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -131,7 +131,7 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string } // mergeOperationStreams performs merge sort for node operation streams to one stream. -func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { +func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { defer close(merged) // Merging different node streams shuffles incoming operations like that: @@ -147,7 +147,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram ms := make([]*pilorama.Move, len(streams)) for i := range streams { - ms[i] = <-streams[i] + select { + case ms[i] = <-streams[i]: + case <-ctx.Done(): + return minStreamedLastHeight + } } for { @@ -164,7 +168,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram break } - merged <- ms[minTimeMoveIndex] + select { + case merged <- ms[minTimeMoveIndex]: + case <-ctx.Done(): + return minStreamedLastHeight + } height := ms[minTimeMoveIndex].Time if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil { minStreamedLastHeight = min(minStreamedLastHeight, height) @@ -176,7 +184,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move, -) uint64 { +) (uint64, error) { var prev *pilorama.Move var batch []*pilorama.Move for m := range operationStream { @@ -189,17 +197,17 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s if len(batch) == s.syncBatchSize { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { - return batch[0].Time + return batch[0].Time, err } batch = batch[:0] } } if len(batch) > 0 { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { - return batch[0].Time + return batch[0].Time, err } } - return math.MaxUint64 + return math.MaxUint64, nil } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, @@ -235,7 +243,11 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, if err := m.Meta.FromBytes(lm.GetMeta()); err != nil { return err } - opsCh <- m + select { + case opsCh <- m: + case <-ctx.Done(): + return ctx.Err() + } } if !errors.Is(err, io.EOF) { return err @@ -264,13 +276,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, merged := make(chan *pilorama.Move) var minStreamedLastHeight uint64 errGroup.Go(func() error { - minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged) + minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged) return nil }) var minUnappliedHeight uint64 errGroup.Go(func() error { - minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged) - return nil + var err error + minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged) + return err }) var allNodesSynced atomic.Bool diff --git a/pkg/services/tree/sync_test.go b/pkg/services/tree/sync_test.go index 497d90554..87d419408 100644 --- a/pkg/services/tree/sync_test.go +++ b/pkg/services/tree/sync_test.go @@ -1,6 +1,7 @@ package tree import ( + "context" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" @@ -64,7 +65,7 @@ func Test_mergeOperationStreams(t *testing.T) { merged := make(chan *pilorama.Move, 1) min := make(chan uint64) go func() { - min <- mergeOperationStreams(nodeOpChans, merged) + min <- mergeOperationStreams(context.Background(), nodeOpChans, merged) }() var res []uint64