diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 95c8f8013..bd0d61eeb 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -183,14 +183,15 @@ func (s *Service) replicate(op movePair) error { return nil } -func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) { +func (s *Service) pushToQueue(ctx context.Context, cid cidSDK.ID, treeID string, op *pilorama.Move) { select { case s.replicateCh <- movePair{ cid: cid, treeID: treeID, op: op, }: - default: + case <-s.closeCh: + case <-ctx.Done(): } } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 10c3b6ccc..634c06d17 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -135,7 +135,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + s.pushToQueue(ctx, cid, b.GetTreeId(), log) return &AddResponse{ Body: &AddResponse_Body{ NodeId: log.Child, @@ -182,7 +182,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } for i := range logs { - s.pushToQueue(cid, b.GetTreeId(), &logs[i]) + s.pushToQueue(ctx, cid, b.GetTreeId(), &logs[i]) } nodes := make([]uint64, len(logs)) @@ -236,7 +236,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + s.pushToQueue(ctx, cid, b.GetTreeId(), log) return new(RemoveResponse), nil } @@ -281,7 +281,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + s.pushToQueue(ctx, cid, b.GetTreeId(), log) return new(MoveResponse), nil } @@ -586,7 +586,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di } // Apply locally applies operation from the remote node to the tree. -func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) { +func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { err := verifyMessage(req) if err != nil { return nil, err @@ -624,7 +624,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e Meta: meta, }, }: - default: + case <-s.closeCh: + case <-ctx.Done(): } return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil }