[#166] node: Parallelize background tree service sync by operation batching #235

Merged
fyrchik merged 2 commits from aarifullin/frostfs-node:feature/166-batch_tree_apply into master 2023-04-26 10:17:59 +00:00
2 changed files with 240 additions and 66 deletions

View file

@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
return nil return nil
} }
// mergeOperationStreams performs merge sort for node operation streams to one stream.
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
defer close(merged)
ms := make([]*pilorama.Move, len(streams))
for i := range streams {
ms[i] = <-streams[i]
fyrchik marked this conversation as resolved Outdated

Closed channel will just return default value, do we need , ok and if here?

Closed channel will just return default value, do we need `, ok` and `if` here?

My bad. ok, if are unnecessary

My bad. `ok, if` are unnecessary

Fixed

Fixed
}
// Merging different node streams shuffles incoming operations like that:
//
// x - operation from the stream A
// o - operation from the stream B
//
// --o---o--x--x--x--o---x--x------> t
// ^
// If all ops have been successfully applied, we must start from the last
// operation height from the stream B. This height is stored in minStreamedLastHeight.
var minStreamedLastHeight uint64 = math.MaxUint64
for {
var minTimeMoveTime uint64 = math.MaxUint64

Do we still need it?

Do we still need it?
minTimeMoveIndex := -1
for i, m := range ms {
if m != nil && minTimeMoveTime > m.Time {
minTimeMoveTime = m.Time
minTimeMoveIndex = i
}
fyrchik marked this conversation as resolved Outdated

Can we just use [0] element as the first minimum and store index only?

Can we just use `[0]` element as the first minimum and store `index` only?

Here -1 is like C++'s std::find_if(ms.begin(), ms.end(), []{...} == ms.end()) (sorry, only this idea comes to my mind).

You asked here if we check for -1 when all streams are closed - it's correct, I need this logic

Here `-1` is like C++'s `std::find_if(ms.begin(), ms.end(), []{...} == ms.end())` (sorry, only this idea comes to my mind). You asked [here](https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/235/files#issuecomment-6981) if we check for `-1` when all streams are closed - it's correct, I need this logic
}
if minTimeMoveIndex == -1 {
break
}
merged <- ms[minTimeMoveIndex]
height := ms[minTimeMoveIndex].Time
fyrchik marked this conversation as resolved Outdated

This is possible only if we have no streams or all of them are empty, right?

This is possible only if we have no streams or all of them are empty, right?

Yes, you're right

Yes, you're right
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
if minStreamedLastHeight > height {
minStreamedLastHeight = height
}
}
}
return minStreamedLastHeight
}
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move) uint64 {
errGroup, _ := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
// We run TreeApply concurrently for the operation batch. Let's consider two operations
fyrchik marked this conversation as resolved Outdated

Just a question: we use notApplied only to maintain some height.
Wouldn't it be easier to return this height from the function and avoid providing a channel as an argument? (locally we could use either mutex or channel, mutex looks simpler to me).

Just a question: we use `notApplied` only to maintain some height. Wouldn't it be easier to return this height from the function and avoid providing a channel as an argument? (locally we could use either mutex or channel, mutex looks simpler to me).

You're right. That's really possible and will make my code easier

You're right. That's really possible and will make my code easier

Fixed

Fixed
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
// on m1. That means the service must start sync from m1.Time in the next iteration and
// this height is stored in unappliedOperationHeight.
var unappliedOperationHeight uint64 = math.MaxUint64
var heightMtx sync.Mutex
fyrchik marked this conversation as resolved Outdated

Do we need prev or only prev.Time?

Do we need `prev` or only `prev.Time`?

Only its .Time. Do you think I should introduce uint64 variable?

Only its `.Time`. Do you think I should introduce `uint64` variable?

I would use uint64, but nothing wrong with your approach.

I would use `uint64`, but nothing wrong with your approach.
var prev *pilorama.Move
for m := range operationStream {
m := m
// skip already applied op
if prev != nil && prev.Time == m.Time {
continue
}
acid-ant marked this conversation as resolved Outdated

Can you merge it in one if?

Can you merge it in one if?

Done

Done
prev = m
errGroup.Go(func() error {
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
heightMtx.Lock()

magic number?

magic number?

Have made it as local named constant

Have made it as *local* named constant

By the way, it was constant in the hotfix, we can make it a parameter here: the difference between 1024 and 16k is significant, we may want to increase this parameter on high-end servers.

By the way, it was constant in the hotfix, we can make it a parameter here: the difference between 1024 and 16k is significant, we may want to increase this parameter on high-end servers.
fyrchik marked this conversation as resolved Outdated

How can I easily be sure that this operation would not block forever?

How can I easily be sure that this operation would not block forever?

No longer actual

No longer actual
if m.Time < unappliedOperationHeight {
unappliedOperationHeight = m.Time
}
heightMtx.Unlock()
return err
}
return nil
})
}
_ = errGroup.Wait()
return unappliedOperationHeight
}
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return 0, err
}
c, err := treeClient.GetOpLog(ctx, req)
fyrchik marked this conversation as resolved Outdated

@fyrchik Should we add some comments here? The batching happens implicitly, so...

@fyrchik Should we add some comments here? The batching happens implicitly, so...

No, I think it is clear what we are doing.

No, I think it is clear what we are doing.
if err != nil {
fyrchik marked this conversation as resolved Outdated

But can we do better in this case? I mean we already parallelize operations by nodes, it would be nice not to try applying the same operation from different nodes.

But can we do better in this case? I mean we already parallelize operations by nodes, it would be nice not to try applying the same operation from different nodes.
return 0, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err
}
opsCh <- m

I am sure the error check must NOT be ignored here: we'll never figure out if TreeApply is failed in Go

I am sure the error check must NOT be ignored here: we'll never figure out if `TreeApply` is failed in `Go`
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
}
// synchronizeTree synchronizes operations getting them from different nodes.
// Each available node does stream operations to a separate stream. These streams
// are merged into one big stream ordered by operation time. This way allows to skip
// already applied operation and keep good batching.
// The method returns a height that service should start sync from in the next time.
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64, func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeID string, nodes []netmapSDK.NodeInfo) uint64 { treeID string, nodes []netmapSDK.NodeInfo) uint64 {
s.log.Debug(logs.TreeSynchronizeTree, s.log.Debug(logs.TreeSynchronizeTree,
@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
zap.Uint64("from", from)) zap.Uint64("from", from))
errGroup, egCtx := errgroup.WithContext(ctx) errGroup, egCtx := errgroup.WithContext(ctx)
const workersCount = 4 const workersCount = 1024
errGroup.SetLimit(workersCount) errGroup.SetLimit(workersCount)
heights := make([]uint64, len(nodes)) nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
for i := range nodeOperationStreams {
nodeOperationStreams[i] = make(chan *pilorama.Move)
}
merged := make(chan *pilorama.Move)
var minStreamedLastHeight uint64
errGroup.Go(func() error {
fyrchik marked this conversation as resolved Outdated

We already use errgroup, do we have any benefits when using a channel instead of a simple variable?

We already use `errgroup`, do we have any benefits when using a channel instead of a simple variable?

Fair point, fixed

Fair point, fixed
minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged)
return nil
})
var minUnappliedHeight uint64
errGroup.Go(func() error {
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
return nil
})
for i, n := range nodes { for i, n := range nodes {
i := i i := i
n := n n := n
@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
treeClient := NewTreeServiceClient(cc) treeClient := NewTreeServiceClient(cc)
for { for {
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient) h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
if height < h { if height < h {
height = h height = h
} }
@ -174,78 +320,21 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
} }
} }
}) })
close(nodeOperationStreams[i])
if height <= from { // do not increase starting height on fail
heights[i] = from
return nil
}
heights[i] = height
return nil return nil
}) })
} }
if err := errGroup.Wait(); err != nil { if err := errGroup.Wait(); err != nil {
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err)) s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
} }
newHeight := uint64(math.MaxUint64) newHeight := minStreamedLastHeight
for _, height := range heights { // take minimum across all clients if newHeight > minUnappliedHeight {
if height < newHeight { newHeight = minUnappliedHeight
newHeight = height
}
}
if newHeight == math.MaxUint64 {
newHeight = from
}
return newHeight
}
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID)
for {
newHeight := height
req := &GetOpLogRequest{
Body: &GetOpLogRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return newHeight, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return newHeight, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return newHeight, err
}
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
return newHeight, err
}
if m.Time > newHeight {
newHeight = m.Time + 1
} else { } else {
newHeight++ newHeight++
} }
} return newHeight
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
}
} }
// ErrAlreadySyncing is returned when a service synchronization has already // ErrAlreadySyncing is returned when a service synchronization has already

View file

@ -0,0 +1,85 @@
package tree
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"github.com/stretchr/testify/require"
)
func Test_mergeOperationStreams(t *testing.T) {
tests := []struct {
name string
ctx context.Context
opTimes [][]uint64
wantValues []uint64
wantMinHeight uint64
}{
{
name: "1",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
{
name: "2",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255, 259},
{252, 253, 254, 256, 257},
},
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259},
wantMinHeight: 257,
},
{
name: "3",
ctx: context.Background(),
opTimes: [][]uint64{
{250, 251, 255},
{249, 250, 251, 253, 254, 256, 257},
},
wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257},
wantMinHeight: 255,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes))
for i := range nodeOpChans {
nodeOpChans[i] = make(chan *pilorama.Move)
}
// generate and put values to all chans
for i, ch := range nodeOpChans {
i := i
ch := ch
go func() {
for _, tm := range tt.opTimes[i] {
op := &pilorama.Move{}
op.Time = tm
ch <- op
}
close(nodeOpChans[i])
}()
}
merged := make(chan *pilorama.Move, 1)
min := make(chan uint64)
go func() {
min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged)
}()
var res []uint64
for op := range merged {
res = append(res, op.Time)
}
require.Equal(t, tt.wantValues, res)
require.Equal(t, tt.wantMinHeight, <-min)
})
}
}