[#166] node: Parallelize background tree service sync by operation batching #235

Merged
fyrchik merged 2 commits from aarifullin/frostfs-node:feature/166-batch_tree_apply into master 2023-04-26 10:17:59 +00:00
2 changed files with 240 additions and 66 deletions

View file

@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
return nil
}
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
ms[i] = <-streams[i]
}
// Merging different node streams shuffles incoming operations like that:
//
// x - operation from the stream A
// o - operation from the stream B
//
// --o---o--x--x--x--o---x--x------> t
// ^
// If all ops have been successfully applied, we must start from the last
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
for {
var minTimeMoveTime uint64 = math.MaxUint64
minTimeMoveIndex := -1
for i, m := range ms {
if m != nil && minTimeMoveTime > m.Time {
minTimeMoveTime = m.Time
minTimeMoveIndex = i
}
}
if minTimeMoveIndex == -1 {
break
}
merged <- ms[minTimeMoveIndex]
height := ms[minTimeMoveIndex].Time
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
if minStreamedLastHeight > height {
minStreamedLastHeight = height
}
}
}
return minStreamedLastHeight
}
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move) uint64 {
errGroup, _ := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
// We run TreeApply concurrently for the operation batch. Let's consider two operations
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
// on m1. That means the service must start sync from m1.Time in the next iteration and
// this height is stored in unappliedOperationHeight.
var unappliedOperationHeight uint64 = math.MaxUint64
var heightMtx sync.Mutex
var prev *pilorama.Move
for m := range operationStream {
m := m
// skip already applied op
if prev != nil && prev.Time == m.Time {
continue
}
prev = m
errGroup.Go(func() error {
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
heightMtx.Lock()
if m.Time < unappliedOperationHeight {
unappliedOperationHeight = m.Time
}
heightMtx.Unlock()
return err
}
return nil
})
}
_ = errGroup.Wait()
return unappliedOperationHeight
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return 0, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return 0, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err
}
opsCh <- m
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
}
// synchronizeTree synchronizes operations getting them from different nodes.
// Each available node does stream operations to a separate stream. These streams
// are merged into one big stream ordered by operation time. This way allows to skip
// already applied operation and keep good batching.
// The method returns a height that service should start sync from in the next time.
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug(logs.TreeSynchronizeTree,
@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 4
const workersCount = 1024
errGroup.SetLimit(workersCount)
heights := make([]uint64, len(nodes))
nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
for i := range nodeOperationStreams {
nodeOperationStreams[i] = make(chan *pilorama.Move)
}
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged)
return nil
})
var minUnappliedHeight uint64
errGroup.Go(func() error {
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
return nil
})
for i, n := range nodes {
i := i
n := n
@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeClient := NewTreeServiceClient(cc)
for {
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
if height < h {
height = h
}
@ -174,80 +320,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
}
}
})
if height <= from { // do not increase starting height on fail
heights[i] = from
return nil
}
heights[i] = height
close(nodeOperationStreams[i])
return nil
})
}
if err := errGroup.Wait(); err != nil {
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
}
newHeight := uint64(math.MaxUint64)
for _, height := range heights { // take minimum across all clients
if height < newHeight {
newHeight = height
}
}
if newHeight == math.MaxUint64 {
newHeight = from
newHeight := minStreamedLastHeight
if newHeight > minUnappliedHeight {
newHeight = minUnappliedHeight
} else {
newHeight++
}
return newHeight
}
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return newHeight, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return newHeight, err
}
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else {
newHeight++
}
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
}
// ErrAlreadySyncing is returned when a service synchronization has already
// been started.
var ErrAlreadySyncing = errors.New("service is being synchronized")

View file

@ -0,0 +1,85 @@
package tree
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"github.com/stretchr/testify/require"
)
func Test_mergeOperationStreams(t *testing.T) {
tests := []struct {
name string
ctx context.Context
opTimes [][]uint64
wantValues []uint64
wantMinHeight uint64
}{
{
name: "1",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
{
name: "2",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255, 259},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259},
wantMinHeight: 257,
},
{
name: "3",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{249, 250, 251, 253, 254, 256, 257},
},
wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes))
for i := range nodeOpChans {
nodeOpChans[i] = make(chan *pilorama.Move)
}
// generate and put values to all chans
for i, ch := range nodeOpChans {
i := i
ch := ch
go func() {
for _, tm := range tt.opTimes[i] {
op := &pilorama.Move{}
op.Time = tm
ch <- op
}
close(nodeOpChans[i])
}()
}
merged := make(chan *pilorama.Move, 1)
min := make(chan uint64)
go func() {
min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged)
}()
var res []uint64
for op := range merged {
res = append(res, op.Time)
}
require.Equal(t, tt.wantValues, res)
require.Equal(t, tt.wantMinHeight, <-min)
})
}
}