[#166] node: Parallelize background tree service sync by operation batching #235
2 changed files with 240 additions and 66 deletions
|
@ -125,6 +125,137 @@ func (s *Service) SynchronizeTree(ctx context.Context, cid cid.ID, treeID string
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mergeOperationStreams performs merge sort for node operation streams to one stream.
|
||||||
|
func mergeOperationStreams(ctx context.Context, streams []chan *pilorama.Move, merged chan<- *pilorama.Move) uint64 {
|
||||||
|
defer close(merged)
|
||||||
|
|
||||||
|
ms := make([]*pilorama.Move, len(streams))
|
||||||
|
for i := range streams {
|
||||||
|
ms[i] = <-streams[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merging different node streams shuffles incoming operations like that:
|
||||||
|
//
|
||||||
|
// x - operation from the stream A
|
||||||
|
// o - operation from the stream B
|
||||||
|
//
|
||||||
|
// --o---o--x--x--x--o---x--x------> t
|
||||||
|
// ^
|
||||||
|
// If all ops have been successfully applied, we must start from the last
|
||||||
|
// operation height from the stream B. This height is stored in minStreamedLastHeight.
|
||||||
|
var minStreamedLastHeight uint64 = math.MaxUint64
|
||||||
|
|
||||||
|
for {
|
||||||
|
var minTimeMoveTime uint64 = math.MaxUint64
|
||||||
|
minTimeMoveIndex := -1
|
||||||
|
for i, m := range ms {
|
||||||
|
if m != nil && minTimeMoveTime > m.Time {
|
||||||
|
minTimeMoveTime = m.Time
|
||||||
|
minTimeMoveIndex = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if minTimeMoveIndex == -1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
merged <- ms[minTimeMoveIndex]
|
||||||
|
height := ms[minTimeMoveIndex].Time
|
||||||
|
if ms[minTimeMoveIndex] = <-streams[minTimeMoveIndex]; ms[minTimeMoveIndex] == nil {
|
||||||
|
if minStreamedLastHeight > height {
|
||||||
|
minStreamedLastHeight = height
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return minStreamedLastHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
|
operationStream <-chan *pilorama.Move) uint64 {
|
||||||
|
errGroup, _ := errgroup.WithContext(ctx)
|
||||||
|
const workersCount = 1024
|
||||||
|
errGroup.SetLimit(workersCount)
|
||||||
|
|
||||||
|
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
||||||
|
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
||||||
|
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
||||||
|
// this height is stored in unappliedOperationHeight.
|
||||||
|
var unappliedOperationHeight uint64 = math.MaxUint64
|
||||||
|
var heightMtx sync.Mutex
|
||||||
|
|
||||||
|
var prev *pilorama.Move
|
||||||
|
for m := range operationStream {
|
||||||
|
m := m
|
||||||
|
|
||||||
|
// skip already applied op
|
||||||
|
if prev != nil && prev.Time == m.Time {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
prev = m
|
||||||
|
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
||||||
|
heightMtx.Lock()
|
||||||
|
if m.Time < unappliedOperationHeight {
|
||||||
|
unappliedOperationHeight = m.Time
|
||||||
|
}
|
||||||
|
heightMtx.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ = errGroup.Wait()
|
||||||
|
return unappliedOperationHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||||
|
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move) (uint64, error) {
|
||||||
|
rawCID := make([]byte, sha256.Size)
|
||||||
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
|
for {
|
||||||
|
newHeight := height
|
||||||
|
req := &GetOpLogRequest{
|
||||||
|
Body: &GetOpLogRequest_Body{
|
||||||
|
ContainerId: rawCID,
|
||||||
|
TreeId: treeID,
|
||||||
|
Height: newHeight,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("can't initialize client: %w", err)
|
||||||
|
}
|
||||||
|
res, err := c.Recv()
|
||||||
|
for ; err == nil; res, err = c.Recv() {
|
||||||
|
lm := res.GetBody().GetOperation()
|
||||||
|
m := &pilorama.Move{
|
||||||
|
Parent: lm.ParentId,
|
||||||
|
Child: lm.ChildId,
|
||||||
|
}
|
||||||
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
opsCh <- m
|
||||||
|
}
|
||||||
|
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
return newHeight, err
|
||||||
|
}
|
||||||
|
height = newHeight
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||||
|
// Each available node does stream operations to a separate stream. These streams
|
||||||
|
// are merged into one big stream ordered by operation time. This way allows to skip
|
||||||
|
// already applied operation and keep good batching.
|
||||||
|
// The method returns a height that service should start sync from in the next time.
|
||||||
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
treeID string, nodes []netmapSDK.NodeInfo) uint64 {
|
||||||
s.log.Debug(logs.TreeSynchronizeTree,
|
s.log.Debug(logs.TreeSynchronizeTree,
|
||||||
|
@ -133,10 +264,25 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
zap.Uint64("from", from))
|
zap.Uint64("from", from))
|
||||||
|
|
||||||
errGroup, egCtx := errgroup.WithContext(ctx)
|
errGroup, egCtx := errgroup.WithContext(ctx)
|
||||||
const workersCount = 4
|
const workersCount = 1024
|
||||||
errGroup.SetLimit(workersCount)
|
errGroup.SetLimit(workersCount)
|
||||||
|
|
||||||
heights := make([]uint64, len(nodes))
|
nodeOperationStreams := make([]chan *pilorama.Move, len(nodes))
|
||||||
|
for i := range nodeOperationStreams {
|
||||||
|
nodeOperationStreams[i] = make(chan *pilorama.Move)
|
||||||
|
}
|
||||||
|
merged := make(chan *pilorama.Move)
|
||||||
|
var minStreamedLastHeight uint64
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
minStreamedLastHeight = mergeOperationStreams(ctx, nodeOperationStreams, merged)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
var minUnappliedHeight uint64
|
||||||
|
errGroup.Go(func() error {
|
||||||
|
minUnappliedHeight = s.applyOperationStream(ctx, cid, treeID, merged)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
for i, n := range nodes {
|
for i, n := range nodes {
|
||||||
i := i
|
i := i
|
||||||
n := n
|
n := n
|
||||||
|
@ -164,7 +310,7 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
|
|
||||||
treeClient := NewTreeServiceClient(cc)
|
treeClient := NewTreeServiceClient(cc)
|
||||||
for {
|
for {
|
||||||
h, err := s.synchronizeSingle(egCtx, cid, treeID, height, treeClient)
|
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
||||||
if height < h {
|
if height < h {
|
||||||
height = h
|
height = h
|
||||||
}
|
}
|
||||||
|
@ -174,80 +320,23 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
close(nodeOperationStreams[i])
|
||||||
if height <= from { // do not increase starting height on fail
|
|
||||||
heights[i] = from
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
heights[i] = height
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := errGroup.Wait(); err != nil {
|
if err := errGroup.Wait(); err != nil {
|
||||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationOverAllNodes, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
newHeight := uint64(math.MaxUint64)
|
newHeight := minStreamedLastHeight
|
||||||
for _, height := range heights { // take minimum across all clients
|
if newHeight > minUnappliedHeight {
|
||||||
if height < newHeight {
|
newHeight = minUnappliedHeight
|
||||||
newHeight = height
|
} else {
|
||||||
}
|
newHeight++
|
||||||
}
|
|
||||||
if newHeight == math.MaxUint64 {
|
|
||||||
newHeight = from
|
|
||||||
}
|
}
|
||||||
return newHeight
|
return newHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID string, height uint64, treeClient TreeServiceClient) (uint64, error) {
|
|
||||||
rawCID := make([]byte, sha256.Size)
|
|
||||||
cid.Encode(rawCID)
|
|
||||||
|
|
||||||
for {
|
|
||||||
newHeight := height
|
|
||||||
req := &GetOpLogRequest{
|
|
||||||
Body: &GetOpLogRequest_Body{
|
|
||||||
ContainerId: rawCID,
|
|
||||||
TreeId: treeID,
|
|
||||||
Height: newHeight,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
|
||||||
return newHeight, err
|
|
||||||
}
|
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
|
||||||
if err != nil {
|
|
||||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := c.Recv()
|
|
||||||
for ; err == nil; res, err = c.Recv() {
|
|
||||||
lm := res.GetBody().GetOperation()
|
|
||||||
m := &pilorama.Move{
|
|
||||||
Parent: lm.ParentId,
|
|
||||||
Child: lm.ChildId,
|
|
||||||
}
|
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
|
||||||
return newHeight, err
|
|
||||||
}
|
|
||||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
|
||||||
return newHeight, err
|
|
||||||
}
|
|
||||||
if m.Time > newHeight {
|
|
||||||
newHeight = m.Time + 1
|
|
||||||
} else {
|
|
||||||
newHeight++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
|
||||||
return newHeight, err
|
|
||||||
}
|
|
||||||
height = newHeight
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ErrAlreadySyncing is returned when a service synchronization has already
|
// ErrAlreadySyncing is returned when a service synchronization has already
|
||||||
// been started.
|
// been started.
|
||||||
var ErrAlreadySyncing = errors.New("service is being synchronized")
|
var ErrAlreadySyncing = errors.New("service is being synchronized")
|
||||||
|
|
85
pkg/services/tree/sync_test.go
Normal file
85
pkg/services/tree/sync_test.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_mergeOperationStreams(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
ctx context.Context
|
||||||
|
opTimes [][]uint64
|
||||||
|
wantValues []uint64
|
||||||
|
wantMinHeight uint64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "1",
|
||||||
|
ctx: context.Background(),
|
||||||
|
opTimes: [][]uint64{
|
||||||
|
{250, 251, 255},
|
||||||
|
{252, 253, 254, 256, 257},
|
||||||
|
},
|
||||||
|
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257},
|
||||||
|
wantMinHeight: 255,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "2",
|
||||||
|
ctx: context.Background(),
|
||||||
|
opTimes: [][]uint64{
|
||||||
|
{250, 251, 255, 259},
|
||||||
|
{252, 253, 254, 256, 257},
|
||||||
|
},
|
||||||
|
wantValues: []uint64{250, 251, 252, 253, 254, 255, 256, 257, 259},
|
||||||
|
wantMinHeight: 257,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "3",
|
||||||
|
ctx: context.Background(),
|
||||||
|
opTimes: [][]uint64{
|
||||||
|
{250, 251, 255},
|
||||||
|
{249, 250, 251, 253, 254, 256, 257},
|
||||||
|
},
|
||||||
|
wantValues: []uint64{249, 250, 250, 251, 251, 253, 254, 255, 256, 257},
|
||||||
|
wantMinHeight: 255,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
nodeOpChans := make([]chan *pilorama.Move, len(tt.opTimes))
|
||||||
|
for i := range nodeOpChans {
|
||||||
|
nodeOpChans[i] = make(chan *pilorama.Move)
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate and put values to all chans
|
||||||
|
for i, ch := range nodeOpChans {
|
||||||
|
i := i
|
||||||
|
ch := ch
|
||||||
|
go func() {
|
||||||
|
for _, tm := range tt.opTimes[i] {
|
||||||
|
op := &pilorama.Move{}
|
||||||
|
op.Time = tm
|
||||||
|
ch <- op
|
||||||
|
}
|
||||||
|
close(nodeOpChans[i])
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
merged := make(chan *pilorama.Move, 1)
|
||||||
|
min := make(chan uint64)
|
||||||
|
go func() {
|
||||||
|
min <- mergeOperationStreams(tt.ctx, nodeOpChans, merged)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var res []uint64
|
||||||
|
for op := range merged {
|
||||||
|
res = append(res, op.Time)
|
||||||
|
}
|
||||||
|
require.Equal(t, tt.wantValues, res)
|
||||||
|
require.Equal(t, tt.wantMinHeight, <-min)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue