diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 868d6ea4fd..5c0300c143 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string return nil } +// mergeOperationStreams performs merge sort for node operation streams to one stream. +func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 { + defer close(merged) + + ms := make([]*pilorama.Move, len(streams)) + for i := range streams { + ms[i] = <-streams[i] + } + + // Merging different node streams shuffles incoming operations like that: + // + // x - operation from the stream A + // o - operation from the stream B + // + // --o---o--x--x--x--o---x--x------> t + // ^ + // If all ops have been successfully applied, we must start from the last + // operation height from the stream B. This height is stored in minStreamedLastHeight. + var minStreamedLastHeight uint64 = math.MaxUint64 + + for { + var minTimeMoveTime uint64 = math.MaxUint64 + minTimeMoveIndex := -1 + for i, m := range ms { + if m != nil && minTimeMoveTime > m.Time { + minTimeMoveTime = m.Time + minTimeMoveIndex = i + } + } + + if minTimeMoveIndex == -1 { + break + } + + merged <- ms[minTimeMoveIndex] + height := ms[minTimeMoveIndex].Time + if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil { + if minStreamedLastHeight > height { + minStreamedLastHeight = height + } + } + } + + return minStreamedLastHeight +} + +func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, + operationStream <-chan *pilorama.Move) uint64 { + errGroup, _ := errgroup.WithContext(ctx) + const workersCount = 1024 + errGroup.SetLimit(workersCount) + + // We run TreeApply concurrently for the operation batch. Let's consider two operations + // in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail + // on m1. That means the service must start sync from m1.Time in the next iteration and + // this height is stored in unappliedOperationHeight. + var unappliedOperationHeight uint64 = math.MaxUint64 + var heightMtx sync.Mutex + + var prev *pilorama.Move + for m := range operationStream { + m := m + + // skip already applied op + if prev != nil && prev.Time == m.Time { + continue + } + prev = m + + errGroup.Go(func() error { + if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { + heightMtx.Lock() + if m.Time < unappliedOperationHeight { + unappliedOperationHeight = m.Time + } + heightMtx.Unlock() + return err + } + return nil + }) + } + _ = errGroup.Wait() + return unappliedOperationHeight +} + +func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, + height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) { + rawCID := make([]byte, sha256.Size) + cid.Encode(rawCID) + + for { + newHeight := height + req := &GetOpLogRequest{ + Body: &GetOpLogRequest_Body{ + ContainerId: rawCID, + TreeId: treeID, + Height: newHeight, + }, + } + if err := SignMessage(req, s.key); err != nil { + return 0, err + } + + c, err := treeClient.GetOpLog(ctx, req) + if err != nil { + return 0, fmt.Errorf("can't initialize client: %w", err) + } + res, err := c.Recv() + for ; err == nil; res, err = c.Recv() { + lm := res.GetBody().GetOperation() + m := &pilorama.Move{ + Parent: lm.ParentId, + Child: lm.ChildId, + } + if err := m.Meta.FromBytes(lm.Meta); err != nil { + return 0, err + } + opsCh <- m + } + if height == newHeight || err != nil && !errors.Is(err, io.EOF) { + return newHeight, err + } + height = newHeight + } +} + +// synchronizeTree synchronizes operations getting them from different nodes. +// Each available node does stream operations to a separate stream. These streams +// are merged into one big stream ordered by operation time. This way allows to skip +// already applied operation and keep good batching. +// The method returns a height that service should start sync from in the next time. func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, treeID string, nodes []netmapSDK.NodeInfo) uint64 { s.log.Debug(logs.TreeSynchronizeTree, @@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, zap.Uint64("from", from)) errGroup, egCtx := errgroup.WithContext(ctx) - const workersCount = 4 + const workersCount = 1024 errGroup.SetLimit(workersCount) - heights := make([]uint64, len(nodes)) + nodeOperationStreams := make([]chan *pilorama.Move, len(nodes)) + for i := range nodeOperationStreams { + nodeOperationStreams[i] = make(chan *pilorama.Move) + } + merged := make(chan *pilorama.Move) + var minStreamedLastHeight uint64 + errGroup.Go(func() error { + minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged) + return nil + }) + var minUnappliedHeight uint64 + errGroup.Go(func() error { + minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged) + return nil + }) + for i, n := range nodes { i := i n := n @@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, treeClient := NewTreeServiceClient(cc) for { - h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient) + h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) if height < h { height = h } @@ -174,94 +320,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, } } }) - - if height <= from { // do not increase starting height on fail - heights[i] = from - return nil - } - heights[i] = height + close(nodeOperationStreams[i]) return nil }) } - if err := errGroup.Wait(); err != nil { s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err)) } - newHeight := uint64(math.MaxUint64) - for _, height := range heights { // take minimum across all clients - if height < newHeight { - newHeight = height - } - } - if newHeight == math.MaxUint64 { - newHeight = from + newHeight := minStreamedLastHeight + if newHeight > minUnappliedHeight { + newHeight = minUnappliedHeight + } else { + newHeight++ } return newHeight } -func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) { - rawCID := make([]byte, sha256.Size) - cid.Encode(rawCID) - - const treeApplyWorkersCount = 1024 - errGroup, egCtx := errgroup.WithContext(ctx) - errGroup.SetLimit(treeApplyWorkersCount) - var heightMtx sync.Mutex - - for { - newHeight := height - req := &GetOpLogRequest{ - Body: &GetOpLogRequest_Body{ - ContainerId: rawCID, - TreeId: treeID, - Height: newHeight, - }, - } - if err := SignMessage(req, s.key); err != nil { - return newHeight, err - } - - c, err := treeClient.GetOpLog(egCtx, req) - if err != nil { - return newHeight, fmt.Errorf("can't initialize client: %w", err) - } - - res, err := c.Recv() - for ; err == nil; res, err = c.Recv() { - lm := res.GetBody().GetOperation() - m := &pilorama.Move{ - Parent: lm.ParentId, - Child: lm.ChildId, - } - if err := m.Meta.FromBytes(lm.Meta); err != nil { - _ = errGroup.Wait() - return newHeight, err - } - errGroup.Go(func() error { - if err := s.forest.TreeApply(egCtx, cid, treeID, m, true); err != nil { - return err - } - heightMtx.Lock() - if m.Time > newHeight { - newHeight = m.Time + 1 - } else { - newHeight++ - } - heightMtx.Unlock() - return nil - }) - } - if errGroupErr := errGroup.Wait(); errGroupErr != nil { - return newHeight, err - } - if height == newHeight || err != nil && !errors.Is(err, io.EOF) { - return newHeight, err - } - height = newHeight - } -} - // ErrAlreadySyncing is returned when a service synchronization has already // been started. var ErrAlreadySyncing = errors.New("service is being synchronized") diff --git a/pkg/services/tree/sync_test.go b/pkg/services/tree/sync_test.go new file mode 100644 index 0000000000..eb51bc3f8d --- /dev/null +++ b/pkg/services/tree/sync_test.go @@ -0,0 +1,85 @@ +package tree + +import ( + "context" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" + "github.com/stretchr/testify/require" +) + +func Test_mergeOperationStreams(t *testing.T) { + tests := []struct { + name string + ctx context.Context + opTimes [][]uint64 + wantValues []uint64 + wantMinHeight uint64 + }{ + { + name: "1", + ctx: context.Background(), + opTimes: [][]uint64{ + {250, 251, 255}, + {252, 253, 254, 256, 257}, + }, + wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257}, + wantMinHeight: 255, + }, + { + name: "2", + ctx: context.Background(), + opTimes: [][]uint64{ + {250, 251, 255, 259}, + {252, 253, 254, 256, 257}, + }, + wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259}, + wantMinHeight: 257, + }, + { + name: "3", + ctx: context.Background(), + opTimes: [][]uint64{ + {250, 251, 255}, + {249, 250, 251, 253, 254, 256, 257}, + }, + wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257}, + wantMinHeight: 255, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes)) + for i := range nodeOpChans { + nodeOpChans[i] = make(chan *pilorama.Move) + } + + // generate and put values to all chans + for i, ch := range nodeOpChans { + i := i + ch := ch + go func() { + for _, tm := range tt.opTimes[i] { + op := &pilorama.Move{} + op.Time = tm + ch <- op + } + close(nodeOpChans[i]) + }() + } + + merged := make(chan *pilorama.Move, 1) + min := make(chan uint64) + go func() { + min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged) + }() + + var res []uint64 + for op := range merged { + res = append(res, op.Time) + } + require.Equal(t, tt.wantValues, res) + require.Equal(t, tt.wantMinHeight, <-min) + }) + } +}