From 299b24b9742756fa2ee65d3dcbdf65e4435673dd Mon Sep 17 00:00:00 2001 From: Airat Arifullin Date: Tue, 11 Apr 2023 18:06:34 +0300 Subject: [PATCH] [#166] node: Parallelize background tree service sync by batching * Concurrently dispatch TreeApply operations for batching in forest Signed-off-by: Airat Arifullin a.arifullin@yadro.com --- pkg/services/tree/sync.go | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 6c4f585a..868d6ea4 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -204,6 +204,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) + const treeApplyWorkersCount = 1024 + errGroup, egCtx := errgroup.WithContext(ctx) + errGroup.SetLimit(treeApplyWorkersCount) + var heightMtx sync.Mutex + for { newHeight := height req := &GetOpLogRequest{ @@ -217,7 +222,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri return newHeight, err } - c, err := treeClient.GetOpLog(ctx, req) + c, err := treeClient.GetOpLog(egCtx, req) if err != nil { return newHeight, fmt.Errorf("can't initialize client: %w", err) } @@ -230,16 +235,25 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri Child: lm.ChildId, } if err := m.Meta.FromBytes(lm.Meta); err != nil { + _ = errGroup.Wait() return newHeight, err } - if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { - return newHeight, err - } - if m.Time > newHeight { - newHeight = m.Time + 1 - } else { - newHeight++ - } + errGroup.Go(func() error { + if err := s.forest.TreeApply(egCtx, cid, treeID, m, true); err != nil { + return err + } + heightMtx.Lock() + if m.Time > newHeight { + newHeight = m.Time + 1 + } else { + newHeight++ + } + heightMtx.Unlock() + return nil + }) + } + if errGroupErr := errGroup.Wait(); errGroupErr != nil { + return newHeight, err } if height == newHeight || err != nil && !errors.Is(err, io.EOF) { return newHeight, err