[#1621] treesvc: Cancel background sync on failure
All checks were successful
DCO action / DCO (pull_request) Successful in 31s
Tests and linters / Run gofumpt (pull_request) Successful in 40s
Vulncheck / Vulncheck (pull_request) Successful in 52s
Build / Build Components (pull_request) Successful in 1m40s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m45s
Tests and linters / Tests (pull_request) Successful in 2m13s
Tests and linters / Staticcheck (pull_request) Successful in 2m12s
Tests and linters / Lint (pull_request) Successful in 2m49s
Tests and linters / gopls check (pull_request) Successful in 3m0s
Tests and linters / Tests with -race (pull_request) Successful in 3m23s

If applyOperationStream() exits prematurely, other goroutines will block
on send and errgroup will never finish waiting. In this commit we also
check whether context is cancelled.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2025-01-30 10:36:02 +03:00
parent 03f754f19b
commit 1580188e59
Signed by: fyrchik
SSH key fingerprint: SHA256:m/TTwCzjnRkXgnzEx9X92ccxy1CcVeinOgDb3NPWWmg
2 changed files with 26 additions and 12 deletions

View file

@ -131,7 +131,7 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
} }
// mergeOperationStreams performs merge sort for node operation streams to one stream. // mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged) defer close(merged)
// Merging different node streams shuffles incoming operations like that: // Merging different node streams shuffles incoming operations like that:
@ -147,7 +147,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
ms := make([]*pilorama.Move, len(streams)) ms := make([]*pilorama.Move, len(streams))
for i := range streams { for i := range streams {
ms[i] = <-streams[i] select {
case ms[i] = <-streams[i]:
case <-ctx.Done():
return minStreamedLastHeight
}
} }
for { for {
@ -164,7 +168,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
break break
} }
merged <- ms[minTimeMoveIndex] select {
case merged <- ms[minTimeMoveIndex]:
case <-ctx.Done():
return minStreamedLastHeight
}
height := ms[minTimeMoveIndex].Time height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil { if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
minStreamedLastHeight = min(minStreamedLastHeight, height) minStreamedLastHeight = min(minStreamedLastHeight, height)
@ -176,7 +184,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move, operationStream <-chan *pilorama.Move,
) uint64 { ) (uint64, error) {
var prev *pilorama.Move var prev *pilorama.Move
var batch []*pilorama.Move var batch []*pilorama.Move
for m := range operationStream { for m := range operationStream {
@ -189,17 +197,17 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
if len(batch) == s.syncBatchSize { if len(batch) == s.syncBatchSize {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time return batch[0].Time, err
} }
batch = batch[:0] batch = batch[:0]
} }
} }
if len(batch) > 0 { if len(batch) > 0 {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time return batch[0].Time, err
} }
} }
return math.MaxUint64 return math.MaxUint64, nil
} }
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
@ -235,7 +243,11 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil { if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err return err
} }
opsCh <- m select {
case opsCh <- m:
case <-ctx.Done():
return ctx.Err()
}
} }
if !errors.Is(err, io.EOF) { if !errors.Is(err, io.EOF) {
return err return err
@ -264,13 +276,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
merged := make(chan *pilorama.Move) merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64 var minStreamedLastHeight uint64
errGroup.Go(func() error { errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged) minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged)
return nil return nil
}) })
var minUnappliedHeight uint64 var minUnappliedHeight uint64
errGroup.Go(func() error { errGroup.Go(func() error {
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged) var err error
return nil minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged)
return err
}) })
var allNodesSynced atomic.Bool var allNodesSynced atomic.Bool

View file

@ -1,6 +1,7 @@
package tree package tree
import ( import (
"context"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -64,7 +65,7 @@ func Test_mergeOperationStreams(t *testing.T) {
merged := make(chan *pilorama.Move, 1) merged := make(chan *pilorama.Move, 1)
min := make(chan uint64) min := make(chan uint64)
go func() { go func() {
min <- mergeOperationStreams(nodeOpChans, merged) min <- mergeOperationStreams(context.Background(), nodeOpChans, merged)
}() }()
var res []uint64 var res []uint64