[#166] node: Parallelize background tree service sync by operation batching #235

Merged
fyrchik merged 2 commits from aarifullin/frostfs-node:feature/166-batch_tree_apply into master 2023-04-26 10:17:59 +00:00
Showing only changes of commit 38f0a0a718 - Show all commits

View file

@ -204,6 +204,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID) cid.Encode(rawCID)
const treeApplyWorkersCount = 1024
errGroup, egCtx := errgroup.WithContext(ctx)
errGroup.SetLimit(treeApplyWorkersCount)
var heightMtx sync.Mutex
for { for {
newHeight := height newHeight := height
req := &GetOpLogRequest{ req := &GetOpLogRequest{
@ -217,7 +222,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
return newHeight, err return newHeight, err
} }
c, err := treeClient.GetOpLog(ctx, req) c, err := treeClient.GetOpLog(egCtx, req)
if err != nil { if err != nil {
return newHeight, fmt.Errorf("can't initialize client: %w", err) return newHeight, fmt.Errorf("can't initialize client: %w", err)
} }
@ -230,16 +235,25 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
Child: lm.ChildId, Child: lm.ChildId,
} }
if err := m.Meta.FromBytes(lm.Meta); err != nil { if err := m.Meta.FromBytes(lm.Meta); err != nil {
_ = errGroup.Wait()
return newHeight, err return newHeight, err
} }
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { errGroup.Go(func() error {
return newHeight, err if err := s.forest.TreeApply(egCtx, cid, treeID, m, true); err != nil {
return err
} }
heightMtx.Lock()

I am sure the error check must NOT be ignored here: we'll never figure out if TreeApply is failed in Go

I am sure the error check must NOT be ignored here: we'll never figure out if `TreeApply` is failed in `Go`
if m.Time > newHeight { if m.Time > newHeight {
newHeight = m.Time + 1 newHeight = m.Time + 1
} else { } else {
newHeight++ newHeight++
} }
heightMtx.Unlock()
return nil
})
}
if errGroupErr := errGroup.Wait(); errGroupErr != nil {
return newHeight, err
} }
if height == newHeight || err != nil && !errors.Is(err, io.EOF) { if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err return newHeight, err