WIP: tree: Sync replication #1468

Closed
dstepanov-yadro wants to merge 1 commit from dstepanov-yadro/frostfs-node:fix/tree_sync_rep into master
2 changed files with 10 additions and 8 deletions

View file

@ -183,14 +183,15 @@ func (s *Service) replicate(op movePair) error {
return nil return nil
} }
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) { func (s *Service) pushToQueue(ctx context.Context, cid cidSDK.ID, treeID string, op *pilorama.Move) {
select { select {
case s.replicateCh <- movePair{ case s.replicateCh <- movePair{
cid: cid, cid: cid,
treeID: treeID, treeID: treeID,
op: op, op: op,
}: }:
default: case <-s.closeCh:
case <-ctx.Done():
} }
} }

View file

@ -135,7 +135,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
return nil, err return nil, err
} }
s.pushToQueue(cid, b.GetTreeId(), log) s.pushToQueue(ctx, cid, b.GetTreeId(), log)
return &AddResponse{ return &AddResponse{
Body: &AddResponse_Body{ Body: &AddResponse_Body{
NodeId: log.Child, NodeId: log.Child,
@ -182,7 +182,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
} }
for i := range logs { for i := range logs {
s.pushToQueue(cid, b.GetTreeId(), &logs[i]) s.pushToQueue(ctx, cid, b.GetTreeId(), &logs[i])
} }
nodes := make([]uint64, len(logs)) nodes := make([]uint64, len(logs))
@ -236,7 +236,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
return nil, err return nil, err
} }
s.pushToQueue(cid, b.GetTreeId(), log) s.pushToQueue(ctx, cid, b.GetTreeId(), log)
return new(RemoveResponse), nil return new(RemoveResponse), nil
} }
@ -281,7 +281,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
return nil, err return nil, err
} }
s.pushToQueue(cid, b.GetTreeId(), log) s.pushToQueue(ctx, cid, b.GetTreeId(), log)
return new(MoveResponse), nil return new(MoveResponse), nil
} }
@ -586,7 +586,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
} }
// Apply locally applies operation from the remote node to the tree. // Apply locally applies operation from the remote node to the tree.
func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) { func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
err := verifyMessage(req) err := verifyMessage(req)
if err != nil { if err != nil {
return nil, err return nil, err
@ -624,7 +624,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
Meta: meta, Meta: meta,
}, },
}: }:
default: case <-s.closeCh:
case <-ctx.Done():
} }
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
} }