Cancel tree sync on errors (support) #1625
2 changed files with 30 additions and 16 deletions
|
@ -131,14 +131,9 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
|||
}
|
||||
|
||||
// mergeOperationStreams performs merge sort for node operation streams to one stream.
|
||||
func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
|
||||
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
|
||||
defer close(merged)
|
||||
|
||||
ms := make([]*pilorama.Move, len(streams))
|
||||
for i := range streams {
|
||||
ms[i] = <-streams[i]
|
||||
}
|
||||
|
||||
// Merging different node streams shuffles incoming operations like that:
|
||||
//
|
||||
// x - operation from the stream A
|
||||
|
@ -150,6 +145,15 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
|||
// operation height from the stream B. This height is stored in minStreamedLastHeight.
|
||||
var minStreamedLastHeight uint64 = math.MaxUint64
|
||||
|
||||
ms := make([]*pilorama.Move, len(streams))
|
||||
for i := range streams {
|
||||
select {
|
||||
case ms[i] = <-streams[i]:
|
||||
case <-ctx.Done():
|
||||
return minStreamedLastHeight
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
var minTimeMoveTime uint64 = math.MaxUint64
|
||||
minTimeMoveIndex := -1
|
||||
|
@ -164,7 +168,11 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
|||
break
|
||||
}
|
||||
|
||||
merged <- ms[minTimeMoveIndex]
|
||||
select {
|
||||
case merged <- ms[minTimeMoveIndex]:
|
||||
case <-ctx.Done():
|
||||
return minStreamedLastHeight
|
||||
}
|
||||
height := ms[minTimeMoveIndex].Time
|
||||
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
|
||||
minStreamedLastHeight = min(minStreamedLastHeight, height)
|
||||
|
@ -176,7 +184,7 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
|||
|
||||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
operationStream <-chan *pilorama.Move,
|
||||
) uint64 {
|
||||
) (uint64, error) {
|
||||
var prev *pilorama.Move
|
||||
var batch []*pilorama.Move
|
||||
for m := range operationStream {
|
||||
|
@ -189,17 +197,17 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
|
|||
|
||||
if len(batch) == s.syncBatchSize {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
return batch[0].Time, err
|
||||
}
|
||||
batch = batch[:0]
|
||||
}
|
||||
}
|
||||
if len(batch) > 0 {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
return batch[0].Time, err
|
||||
}
|
||||
}
|
||||
return math.MaxUint64
|
||||
return math.MaxUint64, nil
|
||||
}
|
||||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
|
@ -235,7 +243,11 @@ func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
|||
if err := m.Meta.FromBytes(lm.GetMeta()); err != nil {
|
||||
return err
|
||||
}
|
||||
opsCh <- m
|
||||
select {
|
||||
case opsCh <- m:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
if !errors.Is(err, io.EOF) {
|
||||
return err
|
||||
|
@ -264,13 +276,14 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
merged := make(chan *pilorama.Move)
|
||||
var minStreamedLastHeight uint64
|
||||
errGroup.Go(func() error {
|
||||
minStreamedLastHeight = mergeOperationStreams(nodeOperationStreams, merged)
|
||||
minStreamedLastHeight = mergeOperationStreams(egCtx, nodeOperationStreams, merged)
|
||||
return nil
|
||||
})
|
||||
var minUnappliedHeight uint64
|
||||
errGroup.Go(func() error {
|
||||
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
|
||||
return nil
|
||||
var err error
|
||||
minUnappliedHeight, err = s.applyOperationStream(egCtx, cid, treeID, merged)
|
||||
return err
|
||||
})
|
||||
|
||||
var allNodesSynced atomic.Bool
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
|
@ -64,7 +65,7 @@ func Test_mergeOperationStreams(t *testing.T) {
|
|||
merged := make(chan *pilorama.Move, 1)
|
||||
min := make(chan uint64)
|
||||
go func() {
|
||||
min <- mergeOperationStreams(nodeOpChans, merged)
|
||||
min <- mergeOperationStreams(context.Background(), nodeOpChans, merged)
|
||||
}()
|
||||
|
||||
var res []uint64
|
||||
|
|
Loading…
Add table
Reference in a new issue