Cancel tree sync on errors (support) #1625

Merged
fyrchik merged 2 commits from fyrchik/frostfs-node:tree-sync-error-suppor into support/v0.44 2025-01-31 06:36:14 +00:00
2 changed files with 30 additions and 16 deletions

View file

@ -131,14 +131,9 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
}
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
ms[i] = <-streams[i]
}
// Merging different node streams shuffles incoming operations like that:
//
// x - operation from the stream A
@ -150,6 +145,15 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
select {
case ms[i] = <-streams[i]:
case <-ctx.Done():
return minStreamedLastHeight
}
}
for {
var minTimeMoveTime uint64 = math.MaxUint64
minTimeMoveIndex := -1
@ -164,7 +168,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
break
}
merged <- ms[minTimeMoveIndex]
select {
case merged <- ms[minTimeMoveIndex]:
case <-ctx.Done():
return minStreamedLastHeight
}
height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
minStreamedLastHeight = min(minStreamedLastHeight, height)
@ -176,7 +184,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move,
) uint64 {
) (uint64, error) {
var prev *pilorama.Move
var batch []*pilorama.Move
for m := range operationStream {
@ -189,17 +197,17 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
if len(batch) == s.syncBatchSize {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time
return batch[0].Time, err
}
batch = batch[:0]
}
}
if len(batch) > 0 {
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time
return batch[0].Time, err
}
}
return math.MaxUint64
return math.MaxUint64, nil
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
@ -235,7 +243,11 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
return err
}
opsCh <- m
select {
case opsCh <- m:
case <-ctx.Done():
return ctx.Err()
}
}
if !errors.Is(err, io.EOF) {
return err
@ -264,13 +276,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged)
minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged)
return nil
})
var minUnappliedHeight uint64
errGroup.Go(func() error {
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
return nil
var err error
minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged)
return err
})
var allNodesSynced atomic.Bool

View file

@ -1,6 +1,7 @@
package tree
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
@ -64,7 +65,7 @@ func Test_mergeOperationStreams(t *testing.T) {
merged := make(chan *pilorama.Move, 1)
min := make(chan uint64)
go func() {
min <- mergeOperationStreams(nodeOpChans, merged)
min <- mergeOperationStreams(context.Background(), nodeOpChans, merged)
}()
var res []uint64