[#166] node: Parallelize background tree service sync by batching

* Merge operations

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
Airat Arifullin 2023-04-20 15:58:40 +03:00 committed by Evgenii Stratonikov
parent 299b24b974
commit 9d01029733
2 changed files with 240 additions and 80 deletions

View file

@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
return nil return nil
} }
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
ms[i] = <-streams[i]
}
// Merging different node streams shuffles incoming operations like that:
//
// x - operation from the stream A
// o - operation from the stream B
//
// --o---o--x--x--x--o---x--x------> t
// ^
// If all ops have been successfully applied, we must start from the last
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
for {
var minTimeMoveTime uint64 = math.MaxUint64
minTimeMoveIndex := -1
for i, m := range ms {
if m != nil && minTimeMoveTime > m.Time {
minTimeMoveTime = m.Time
minTimeMoveIndex = i
}
}
if minTimeMoveIndex == -1 {
break
}
merged <- ms[minTimeMoveIndex]
height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
if minStreamedLastHeight > height {
minStreamedLastHeight = height
}
}
}
return minStreamedLastHeight
}
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move) uint64 {
errGroup, _ := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
// We run TreeApply concurrently for the operation batch. Let's consider two operations
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
// on m1. That means the service must start sync from m1.Time in the next iteration and
// this height is stored in unappliedOperationHeight.
var unappliedOperationHeight uint64 = math.MaxUint64
var heightMtx sync.Mutex
var prev *pilorama.Move
for m := range operationStream {
m := m
// skip already applied op
if prev != nil && prev.Time == m.Time {
continue
}
prev = m
errGroup.Go(func() error {
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
heightMtx.Lock()
if m.Time < unappliedOperationHeight {
unappliedOperationHeight = m.Time
}
heightMtx.Unlock()
return err
}
return nil
})
}
_ = errGroup.Wait()
return unappliedOperationHeight
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return 0, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return 0, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err
}
opsCh <- m
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
}
// synchronizeTree synchronizes operations getting them from different nodes.
// Each available node does stream operations to a separate stream. These streams
// are merged into one big stream ordered by operation time. This way allows to skip
// already applied operation and keep good batching.
// The method returns a height that service should start sync from in the next time.
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 { treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug(logs.TreeSynchronizeTree, s.log.Debug(logs.TreeSynchronizeTree,
@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
zap.Uint64("from", from)) zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 4 const workersCount = 1024
errGroup.SetLimit(workersCount) errGroup.SetLimit(workersCount)
heights := make([]uint64, len(nodes)) nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
for i := range nodeOperationStreams {
nodeOperationStreams[i] = make(chan *pilorama.Move)
}
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged)
return nil
})
var minUnappliedHeight uint64
errGroup.Go(func() error {
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
return nil
})
for i, n := range nodes { for i, n := range nodes {
i := i i := i
n := n n := n
@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeClient := NewTreeServiceClient(cc) treeClient := NewTreeServiceClient(cc)
for { for {
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient) h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
if height < h { if height < h {
height = h height = h
} }
@ -174,94 +320,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
} }
} }
}) })
close(nodeOperationStreams[i])
if height <= from { // do not increase starting height on fail
heights[i] = from
return nil
}
heights[i] = height
return nil return nil
}) })
} }
if err := errGroup.Wait(); err != nil { if err := errGroup.Wait(); err != nil {
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err)) s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
} }
newHeight := uint64(math.MaxUint64) newHeight := minStreamedLastHeight
for _, height := range heights { // take minimum across all clients if newHeight > minUnappliedHeight {
if height < newHeight { newHeight = minUnappliedHeight
newHeight = height } else {
} newHeight++
}
if newHeight == math.MaxUint64 {
newHeight = from
} }
return newHeight return newHeight
} }
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
const treeApplyWorkersCount = 1024
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(treeApplyWorkersCount)
var heightMtx sync.Mutex
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return newHeight, err
}
c, err := treeClient.GetOpLog(egCtx, req)
if err != nil {
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errGroup.Wait()
return newHeight, err
}
errGroup.Go(func() error {
if err := s.forest.TreeApply(egCtx, cid, treeID, m, true); err != nil {
return err
}
heightMtx.Lock()
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
heightMtx.Unlock()
return nil
})
}
if errGroupErr := errGroup.Wait(); errGroupErr != nil {
return newHeight, err
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
}
// ErrAlreadySyncing is returned when a service synchronization has already // ErrAlreadySyncing is returned when a service synchronization has already
// been started. // been started.
var ErrAlreadySyncing = errors.New("service is being synchronized") var ErrAlreadySyncing = errors.New("service is being synchronized")

View file

@ -0,0 +1,85 @@
package tree
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"github.com/stretchr/testify/require"
)
func Test_mergeOperationStreams(t *testing.T) {
tests := []struct {
name string
ctx context.Context
opTimes [][]uint64
wantValues []uint64
wantMinHeight uint64
}{
{
name: "1",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
{
name: "2",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255, 259},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259},
wantMinHeight: 257,
},
{
name: "3",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{249, 250, 251, 253, 254, 256, 257},
},
wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes))
for i := range nodeOpChans {
nodeOpChans[i] = make(chan *pilorama.Move)
}
// generate and put values to all chans
for i, ch := range nodeOpChans {
i := i
ch := ch
go func() {
for _, tm := range tt.opTimes[i] {
op := &pilorama.Move{}
op.Time = tm
ch <- op
}
close(nodeOpChans[i])
}()
}
merged := make(chan *pilorama.Move, 1)
min := make(chan uint64)
go func() {
min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged)
}()
var res []uint64
for op := range merged {
res = append(res, op.Time)
}
require.Equal(t, tt.wantValues, res)
require.Equal(t, tt.wantMinHeight, <-min)
})
}
}