[#166] node: Parallelize background tree service sync by operation batching #235
|
@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
|||
return nil
|
||||
}
|
||||
|
||||
// mergeOperationStreams performs merge sort for node operation streams to one stream.
|
||||
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
|
||||
defer close(merged)
|
||||
|
||||
ms := make([]*pilorama.Move, len(streams))
|
||||
for i := range streams {
|
||||
ms[i] = <-streams[i]
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
||||
// Merging different node streams shuffles incoming operations like that:
|
||||
//
|
||||
// x - operation from the stream A
|
||||
// o - operation from the stream B
|
||||
//
|
||||
// --o---o--x--x--x--o---x--x------> t
|
||||
// ^
|
||||
// If all ops have been successfully applied, we must start from the last
|
||||
// operation height from the stream B. This height is stored in minStreamedLastHeight.
|
||||
var minStreamedLastHeight uint64 = math.MaxUint64
|
||||
|
||||
for {
|
||||
var minTimeMoveTime uint64 = math.MaxUint64
|
||||
fyrchik
commented
Do we still need it? Do we still need it?
|
||||
minTimeMoveIndex := -1
|
||||
for i, m := range ms {
|
||||
if m != nil && minTimeMoveTime > m.Time {
|
||||
minTimeMoveTime = m.Time
|
||||
minTimeMoveIndex = i
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can we just use Can we just use `[0]` element as the first minimum and store `index` only?
aarifullin
commented
Here You asked here if we check for Here `-1` is like C++'s `std::find_if(ms.begin(), ms.end(), []{...} == ms.end())` (sorry, only this idea comes to my mind).
You asked [here](https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/235/files#issuecomment-6981) if we check for `-1` when all streams are closed - it's correct, I need this logic
|
||||
}
|
||||
|
||||
if minTimeMoveIndex == -1 {
|
||||
break
|
||||
}
|
||||
|
||||
merged <- ms[minTimeMoveIndex]
|
||||
height := ms[minTimeMoveIndex].Time
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
This is possible only if we have no streams or all of them are empty, right? This is possible only if we have no streams or all of them are empty, right?
aarifullin
commented
Yes, you're right Yes, you're right
|
||||
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
|
||||
if minStreamedLastHeight > height {
|
||||
minStreamedLastHeight = height
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return minStreamedLastHeight
|
||||
}
|
||||
|
||||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
operationStream <-chan *pilorama.Move) uint64 {
|
||||
errGroup, _ := errgroup.WithContext(ctx)
|
||||
const workersCount = 1024
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Just a question: we use Just a question: we use `notApplied` only to maintain some height.
Wouldn't it be easier to return this height from the function and avoid providing a channel as an argument? (locally we could use either mutex or channel, mutex looks simpler to me).
aarifullin
commented
You're right. That's really possible and will make my code easier You're right. That's really possible and will make my code easier
aarifullin
commented
Fixed Fixed
|
||||
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
||||
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
||||
// this height is stored in unappliedOperationHeight.
|
||||
var unappliedOperationHeight uint64 = math.MaxUint64
|
||||
var heightMtx sync.Mutex
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do we need Do we need `prev` or only `prev.Time`?
aarifullin
commented
Only its Only its `.Time`. Do you think I should introduce `uint64` variable?
fyrchik
commented
I would use I would use `uint64`, but nothing wrong with your approach.
|
||||
|
||||
var prev *pilorama.Move
|
||||
for m := range operationStream {
|
||||
m := m
|
||||
|
||||
// skip already applied op
|
||||
if prev != nil && prev.Time == m.Time {
|
||||
continue
|
||||
}
|
||||
acid-ant marked this conversation as resolved
Outdated
acid-ant
commented
Can you merge it in one if? Can you merge it in one if?
aarifullin
commented
Done Done
|
||||
prev = m
|
||||
|
||||
errGroup.Go(func() error {
|
||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
||||
heightMtx.Lock()
|
||||
carpawell
commented
magic number? magic number?
aarifullin
commented
Have made it as local named constant Have made it as *local* named constant
fyrchik
commented
By the way, it was constant in the hotfix, we can make it a parameter here: the difference between 1024 and 16k is significant, we may want to increase this parameter on high-end servers. By the way, it was constant in the hotfix, we can make it a parameter here: the difference between 1024 and 16k is significant, we may want to increase this parameter on high-end servers.
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
How can I easily be sure that this operation would not block forever? How can I easily be sure that this operation would not block forever?
aarifullin
commented
No longer actual No longer actual
|
||||
if m.Time < unappliedOperationHeight {
|
||||
unappliedOperationHeight = m.Time
|
||||
}
|
||||
heightMtx.Unlock()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
_ = errGroup.Wait()
|
||||
return unappliedOperationHeight
|
||||
}
|
||||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: newHeight,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
fyrchik marked this conversation as resolved
Outdated
aarifullin
commented
@fyrchik Should we add some comments here? The batching happens implicitly, so... @fyrchik Should we add some comments here? The batching happens implicitly, so...
fyrchik
commented
No, I think it is clear what we are doing. No, I think it is clear what we are doing.
|
||||
if err != nil {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
But can we do better in this case? I mean we already parallelize operations by nodes, it would be nice not to try applying the same operation from different nodes. But can we do better in this case? I mean we already parallelize operations by nodes, it would be nice not to try applying the same operation from different nodes.
|
||||
return 0, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
opsCh <- m
|
||||
aarifullin
commented
I am sure the error check must NOT be ignored here: we'll never figure out if I am sure the error check must NOT be ignored here: we'll never figure out if `TreeApply` is failed in `Go`
|
||||
}
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
}
|
||||
}
|
||||
|
||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||
// Each available node does stream operations to a separate stream. These streams
|
||||
// are merged into one big stream ordered by operation time. This way allows to skip
|
||||
// already applied operation and keep good batching.
|
||||
// The method returns a height that service should start sync from in the next time.
|
||||
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||
s.log.Debug(logs.TreeSynchronizeTree,
|
||||
|
@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
zap.Uint64("from", from))
|
||||
|
||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||
const workersCount = 4
|
||||
const workersCount = 1024
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
heights := make([]uint64, len(nodes))
|
||||
nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
|
||||
for i := range nodeOperationStreams {
|
||||
nodeOperationStreams[i] = make(chan *pilorama.Move)
|
||||
}
|
||||
merged := make(chan *pilorama.Move)
|
||||
var minStreamedLastHeight uint64
|
||||
errGroup.Go(func() error {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We already use We already use `errgroup`, do we have any benefits when using a channel instead of a simple variable?
aarifullin
commented
Fair point, fixed Fair point, fixed
|
||||
minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged)
|
||||
return nil
|
||||
})
|
||||
var minUnappliedHeight uint64
|
||||
errGroup.Go(func() error {
|
||||
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
|
||||
return nil
|
||||
})
|
||||
|
||||
for i, n := range nodes {
|
||||
i := i
|
||||
n := n
|
||||
|
@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
|
||||
treeClient := NewTreeServiceClient(cc)
|
||||
for {
|
||||
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
|
||||
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
||||
if height < h {
|
||||
height = h
|
||||
}
|
||||
|
@ -174,80 +320,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
}
|
||||
}
|
||||
})
|
||||
|
||||
if height <= from { // do not increase starting height on fail
|
||||
heights[i] = from
|
||||
return nil
|
||||
}
|
||||
heights[i] = height
|
||||
close(nodeOperationStreams[i])
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
||||
}
|
||||
|
||||
newHeight := uint64(math.MaxUint64)
|
||||
for _, height := range heights { // take minimum across all clients
|
||||
if height < newHeight {
|
||||
newHeight = height
|
||||
}
|
||||
}
|
||||
if newHeight == math.MaxUint64 {
|
||||
newHeight = from
|
||||
newHeight := minStreamedLastHeight
|
||||
if newHeight > minUnappliedHeight {
|
||||
newHeight = minUnappliedHeight
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
return newHeight
|
||||
}
|
||||
|
||||
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: newHeight,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
||||
return newHeight, err
|
||||
}
|
||||
if m.Time > newHeight {
|
||||
newHeight = m.Time + 1
|
||||
} else {
|
||||
newHeight++
|
||||
}
|
||||
}
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
}
|
||||
}
|
||||
|
||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||
// been started.
|
||||
var ErrAlreadySyncing = errors.New("service is being synchronized")
|
||||
|
|
85
pkg/services/tree/sync_test.go
Normal file
|
@ -0,0 +1,85 @@
|
|||
package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_mergeOperationStreams(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
opTimes [][]uint64
|
||||
wantValues []uint64
|
||||
wantMinHeight uint64
|
||||
}{
|
||||
{
|
||||
name: "1",
|
||||
ctx: context.Background(),
|
||||
opTimes: [][]uint64{
|
||||
{250, 251, 255},
|
||||
{252, 253, 254, 256, 257},
|
||||
},
|
||||
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257},
|
||||
wantMinHeight: 255,
|
||||
},
|
||||
{
|
||||
name: "2",
|
||||
ctx: context.Background(),
|
||||
opTimes: [][]uint64{
|
||||
{250, 251, 255, 259},
|
||||
{252, 253, 254, 256, 257},
|
||||
},
|
||||
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259},
|
||||
wantMinHeight: 257,
|
||||
},
|
||||
{
|
||||
name: "3",
|
||||
ctx: context.Background(),
|
||||
opTimes: [][]uint64{
|
||||
{250, 251, 255},
|
||||
{249, 250, 251, 253, 254, 256, 257},
|
||||
},
|
||||
wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257},
|
||||
wantMinHeight: 255,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes))
|
||||
for i := range nodeOpChans {
|
||||
nodeOpChans[i] = make(chan *pilorama.Move)
|
||||
}
|
||||
|
||||
// generate and put values to all chans
|
||||
for i, ch := range nodeOpChans {
|
||||
i := i
|
||||
ch := ch
|
||||
go func() {
|
||||
for _, tm := range tt.opTimes[i] {
|
||||
op := &pilorama.Move{}
|
||||
op.Time = tm
|
||||
ch <- op
|
||||
}
|
||||
close(nodeOpChans[i])
|
||||
}()
|
||||
}
|
||||
|
||||
merged := make(chan *pilorama.Move, 1)
|
||||
min := make(chan uint64)
|
||||
go func() {
|
||||
min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged)
|
||||
}()
|
||||
|
||||
var res []uint64
|
||||
for op := range merged {
|
||||
res = append(res, op.Time)
|
||||
}
|
||||
require.Equal(t, tt.wantValues, res)
|
||||
require.Equal(t, tt.wantMinHeight, <-min)
|
||||
})
|
||||
}
|
||||
}
|
Closed channel will just return default value, do we need
, ok
andif
here?My bad.
ok, if
are unnecessaryFixed