[#741] treesvc: Remove unused height variables
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
306f12e6c5
commit
b215817e14
2 changed files with 37 additions and 44 deletions
|
@ -64,6 +64,7 @@ const (
|
|||
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
|
||||
TreeSynchronizeTree = "synchronize tree"
|
||||
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
|
||||
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
|
||||
TreeSyncingTrees = "syncing trees..."
|
||||
TreeCouldNotFetchContainers = "could not fetch containers"
|
||||
TreeTreesHaveBeenSynchronized = "trees have been synchronized"
|
||||
|
|
|
@ -217,44 +217,41 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
|
|||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
|
||||
) (uint64, error) {
|
||||
) error {
|
||||
rawCID := make([]byte, sha256.Size)
|
||||
cid.Encode(rawCID)
|
||||
|
||||
for {
|
||||
newHeight := height
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: newHeight,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
opsCh <- m
|
||||
}
|
||||
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
|
||||
return newHeight, err
|
||||
}
|
||||
height = newHeight
|
||||
req := &GetOpLogRequest{
|
||||
Body: &GetOpLogRequest_Body{
|
||||
ContainerId: rawCID,
|
||||
TreeId: treeID,
|
||||
Height: height,
|
||||
},
|
||||
}
|
||||
if err := SignMessage(req, s.key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := treeClient.GetOpLog(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't initialize client: %w", err)
|
||||
}
|
||||
res, err := c.Recv()
|
||||
for ; err == nil; res, err = c.Recv() {
|
||||
lm := res.GetBody().GetOperation()
|
||||
m := &pilorama.Move{
|
||||
Parent: lm.ParentId,
|
||||
Child: lm.ChildId,
|
||||
}
|
||||
if err := m.Meta.FromBytes(lm.Meta); err != nil {
|
||||
return err
|
||||
}
|
||||
opsCh <- m
|
||||
}
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// synchronizeTree synchronizes operations getting them from different nodes.
|
||||
|
@ -291,7 +288,6 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
i := i
|
||||
n := n
|
||||
errGroup.Go(func() error {
|
||||
height := from
|
||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||
var a network.Address
|
||||
if err := a.FromString(addr); err != nil {
|
||||
|
@ -315,16 +311,12 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
|
|||
defer cc.Close()
|
||||
|
||||
treeClient := NewTreeServiceClient(cc)
|
||||
for {
|
||||
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
||||
if height < h {
|
||||
height = h
|
||||
}
|
||||
if err != nil || h <= height {
|
||||
// Error with the response, try the next node.
|
||||
return true
|
||||
}
|
||||
err = s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
|
||||
if err != nil {
|
||||
// Error with the response, try the next node.
|
||||
s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
|
||||
}
|
||||
return true
|
||||
})
|
||||
close(nodeOperationStreams[i])
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue