diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index 47299d1c9..554d74091 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri rawCID := make([]byte, sha256.Size) cid.Encode(rawCID) + errG, ctx := errgroup.WithContext(ctx) + errG.SetLimit(1024) + + var heightMtx sync.Mutex + for { newHeight := height req := &GetOpLogRequest{ @@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri }, } if err := SignMessage(req, s.key); err != nil { + _ = errG.Wait() return newHeight, err } c, err := treeClient.GetOpLog(ctx, req) if err != nil { + _ = errG.Wait() return newHeight, fmt.Errorf("can't initialize client: %w", err) } @@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri Child: lm.ChildId, } if err := m.Meta.FromBytes(lm.Meta); err != nil { + _ = errG.Wait() return newHeight, err } - if err := s.forest.TreeApply(cid, treeID, m, true); err != nil { - return newHeight, err - } - if m.Time > newHeight { - newHeight = m.Time + 1 - } else { - newHeight++ - } + errG.Go(func() error { + err := s.forest.TreeApply(cid, treeID, m, true) + heightMtx.Lock() + defer heightMtx.Unlock() + if err != nil { + if newHeight > height { + height = newHeight + } + return err + } + if m.Time > newHeight { + newHeight = m.Time + 1 + } else { + newHeight++ + } + return nil + }) } + + applyErr := errG.Wait() + if err == nil { + err = applyErr + } + + heightMtx.Lock() if height == newHeight || err != nil && !errors.Is(err, io.EOF) { + heightMtx.Unlock() return newHeight, err } height = newHeight + heightMtx.Unlock() } }