[#741] treesvc: Remove unused height variables

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-11-28 10:12:39 +03:00
parent 5521737f0b
commit b8a3f17759
2 changed files with 37 additions and 44 deletions

View file

@ -64,6 +64,7 @@ const (
TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree" TreeCouldNotUpdateLastSynchronizedHeightForATree = "could not update last synchronized height for a tree"
TreeSynchronizeTree = "synchronize tree" TreeSynchronizeTree = "synchronize tree"
TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes" TreeFailedToRunTreeSynchronizationOverAllNodes = "failed to run tree synchronization over all nodes"
TreeFailedToRunTreeSynchronizationForSpecificNode = "failed to run tree synchronization for specific node"
TreeSyncingTrees = "syncing trees..." TreeSyncingTrees = "syncing trees..."
TreeCouldNotFetchContainers = "could not fetch containers" TreeCouldNotFetchContainers = "could not fetch containers"
TreeTreesHaveBeenSynchronized = "trees have been synchronized" TreeTreesHaveBeenSynchronized = "trees have been synchronized"

View file

@ -217,44 +217,41 @@ func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID s
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move, height uint64, treeClient TreeServiceClient, opsCh chan<- *pilorama.Move,
) (uint64, error) { ) error {
rawCID := make([]byte, sha256.Size) rawCID := make([]byte, sha256.Size)
cid.Encode(rawCID) cid.Encode(rawCID)
for { req := &GetOpLogRequest{
newHeight := height Body: &GetOpLogRequest_Body{
req := &GetOpLogRequest{ ContainerId: rawCID,
Body: &GetOpLogRequest_Body{ TreeId: treeID,
ContainerId: rawCID, Height: height,
TreeId: treeID, },
Height: newHeight,
},
}
if err := SignMessage(req, s.key); err != nil {
return 0, err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return 0, fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return 0, err
}
opsCh <- m
}
if height == newHeight || err != nil && !errors.Is(err, io.EOF) {
return newHeight, err
}
height = newHeight
} }
if err := SignMessage(req, s.key); err != nil {
return err
}
c, err := treeClient.GetOpLog(ctx, req)
if err != nil {
return fmt.Errorf("can't initialize client: %w", err)
}
res, err := c.Recv()
for ; err == nil; res, err = c.Recv() {
lm := res.GetBody().GetOperation()
m := &pilorama.Move{
Parent: lm.ParentId,
Child: lm.ChildId,
}
if err := m.Meta.FromBytes(lm.Meta); err != nil {
return err
}
opsCh <- m
}
if err != nil && !errors.Is(err, io.EOF) {
return err
}
return nil
} }
// synchronizeTree synchronizes operations getting them from different nodes. // synchronizeTree synchronizes operations getting them from different nodes.
@ -291,7 +288,6 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
i := i i := i
n := n n := n
errGroup.Go(func() error { errGroup.Go(func() error {
height := from
n.IterateNetworkEndpoints(func(addr string) bool { n.IterateNetworkEndpoints(func(addr string) bool {
var a network.Address var a network.Address
if err := a.FromString(addr); err != nil { if err := a.FromString(addr); err != nil {
@ -315,16 +311,12 @@ func (s *Service) synchronizeTree(ctx context.Context, cid cid.ID, from uint64,
defer cc.Close() defer cc.Close()
treeClient := NewTreeServiceClient(cc) treeClient := NewTreeServiceClient(cc)
for { err = s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i])
h, err := s.startStream(egCtx, cid, treeID, from, treeClient, nodeOperationStreams[i]) if err != nil {
if height < h { // Error with the response, try the next node.
height = h s.log.Warn(logs.TreeFailedToRunTreeSynchronizationForSpecificNode, zap.Error(err), zap.String("address", addr))
}
if err != nil || h <= height {
// Error with the response, try the next node.
return true
}
} }
return true
}) })
close(nodeOperationStreams[i]) close(nodeOperationStreams[i])
return nil return nil