[#396] treesvc: properly remember last height on shutdown
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Previously `newHeight` was updated in parallel, so that applying operation at height H did not imply successful TreeApply() for H-1. And because we have no context in TreeUpdateLastSyncHeight(), invalid starting height could be written if the context was canceled. In this commit we return the new height only if all operations were successfully applied. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
139ded93e1
commit
5e2fcec60f
1 changed files with 16 additions and 30 deletions
|
@ -206,28 +206,26 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
errG, ctx := errgroup.WithContext(ctx)
|
errG, ctx := errgroup.WithContext(ctx)
|
||||||
errG.SetLimit(1024)
|
errG.SetLimit(1024)
|
||||||
|
|
||||||
var heightMtx sync.Mutex
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
newHeight := height
|
|
||||||
req := &GetOpLogRequest{
|
req := &GetOpLogRequest{
|
||||||
Body: &GetOpLogRequest_Body{
|
Body: &GetOpLogRequest_Body{
|
||||||
ContainerId: rawCID,
|
ContainerId: rawCID,
|
||||||
TreeId: treeID,
|
TreeId: treeID,
|
||||||
Height: newHeight,
|
Height: height,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := SignMessage(req, s.key); err != nil {
|
if err := SignMessage(req, s.key); err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return height, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := treeClient.GetOpLog(ctx, req)
|
c, err := treeClient.GetOpLog(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, fmt.Errorf("can't initialize client: %w", err)
|
return height, fmt.Errorf("can't initialize client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lastApplied := height
|
||||||
res, err := c.Recv()
|
res, err := c.Recv()
|
||||||
for ; err == nil; res, err = c.Recv() {
|
for ; err == nil; res, err = c.Recv() {
|
||||||
lm := res.GetBody().GetOperation()
|
lm := res.GetBody().GetOperation()
|
||||||
|
@ -237,39 +235,27 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
|
||||||
}
|
}
|
||||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||||
_ = errG.Wait()
|
_ = errG.Wait()
|
||||||
return newHeight, err
|
return height, err
|
||||||
|
}
|
||||||
|
if lastApplied < m.Meta.Time {
|
||||||
|
lastApplied = m.Meta.Time
|
||||||
}
|
}
|
||||||
errG.Go(func() error {
|
errG.Go(func() error {
|
||||||
err := s.forest.TreeApply(cid, treeID, m, true)
|
return s.forest.TreeApply(cid, treeID, m, true)
|
||||||
heightMtx.Lock()
|
|
||||||
defer heightMtx.Unlock()
|
|
||||||
if err != nil {
|
|
||||||
if newHeight > height {
|
|
||||||
height = newHeight
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if m.Time > newHeight {
|
|
||||||
newHeight = m.Time + 1
|
|
||||||
} else {
|
|
||||||
newHeight++
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First check local errors: if everything is ok, we can update starting height,
|
||||||
|
// because everything was applied.
|
||||||
applyErr := errG.Wait()
|
applyErr := errG.Wait()
|
||||||
if err == nil {
|
if applyErr != nil {
|
||||||
err = applyErr
|
return height, applyErr
|
||||||
}
|
}
|
||||||
|
|
||||||
heightMtx.Lock()
|
height = lastApplied
|
||||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
if err != nil && !errors.Is(err, io.EOF) {
|
||||||
heightMtx.Unlock()
|
return height, err
|
||||||
return newHeight, err
|
|
||||||
}
|
}
|
||||||
height = newHeight
|
|
||||||
heightMtx.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue