[#266] services/tree: Batch operations on the service level
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
f7679a8168
commit
364b4ac572
1 changed files with 34 additions and 8 deletions
|
@ -203,6 +203,11 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
rawCID := make([]byte, sha256.Size)
|
rawCID := make([]byte, sha256.Size)
|
||||||
cid.Encode(rawCID)
|
cid.Encode(rawCID)
|
||||||
|
|
||||||
|
errG, ctx := errgroup.WithContext(ctx)
|
||||||
|
errG.SetLimit(1024)
|
||||||
|
|
||||||
|
var heightMtx sync.Mutex
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
newHeight := height
|
||||||
req := &GetOpLogRequest{
|
req := &GetOpLogRequest{
|
||||||
|
@ -213,11 +218,13 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,21 +236,40 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
Child: lm.ChildId,
|
Child: lm.ChildId,
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
if err := s.forest.TreeApply(cid, treeID, m, true); err != nil {
|
errG.Go(func() error {
|
||||||
return newHeight, err
|
err := s.forest.TreeApply(cid, treeID, m, true)
|
||||||
}
|
heightMtx.Lock()
|
||||||
if m.Time > newHeight {
|
defer heightMtx.Unlock()
|
||||||
newHeight = m.Time + 1
|
if err != nil {
|
||||||
} else {
|
if newHeight > height {
|
||||||
newHeight++
|
height = newHeight
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if m.Time > newHeight {
|
||||||
|
newHeight = m.Time + 1
|
||||||
|
} else {
|
||||||
|
newHeight++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
applyErr := errG.Wait()
|
||||||
|
if err == nil {
|
||||||
|
err = applyErr
|
||||||
|
}
|
||||||
|
|
||||||
|
heightMtx.Lock()
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||||
|
heightMtx.Unlock()
|
||||||
return newHeight, err
|
return newHeight, err
|
||||||
}
|
}
|
||||||
height = newHeight
|
height = newHeight
|
||||||
|
heightMtx.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue