diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index ee40884eb..bc6e26fa7 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -1,6 +1,7 @@ package tree import ( + "bytes" "context" "crypto/sha256" "encoding/hex" @@ -22,6 +23,7 @@ type movePair struct { cid cidSDK.ID treeID string op *pilorama.Move + excPub []byte } type replicationTask struct { @@ -178,19 +180,48 @@ func (s *Service) replicate(ctx context.Context, op movePair) error { } for i := range nodes { - if i != localIndex { + if i != localIndex && !bytes.Equal(nodes[i].PublicKey(), op.excPub) { s.replicationTasks <- replicationTask{nodes[i], req} } } return nil } -func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) { +func (s *Service) replicateToRemoteNode(ctx context.Context, op movePair, + nodes []netmapSDK.NodeInfo, localIndex int, +) ([]byte, error) { + req := newApplyRequest(&op) + err := SignMessage(req, s.key) + if err != nil { + return nil, fmt.Errorf("can't sign data: %w", err) + } + + var errMulti error + for i := range nodes { + if i != localIndex { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + err := s.ReplicateTreeOp(ctx, nodes[i], req) + if err != nil { + errMulti = errors.Join(errMulti, err) + continue + } + return nodes[i].PublicKey(), nil + } + } + return nil, errMulti +} + +func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move, excPub []byte) { select { case s.replicateCh <- movePair{ cid: cid, treeID: treeID, op: op, + excPub: excPub, }: default: } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 3994d6973..343a2c902 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -142,7 +142,16 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + excPub, err := s.replicateToRemoteNode(ctx, movePair{ + cid: cid, + treeID: b.GetTreeId(), + op: log, + }, ns, pos) + if err != nil { + return nil, err + } + s.pushToQueue(cid, b.GetTreeId(), log, excPub) + return &AddResponse{ Body: &AddResponse_Body{ NodeId: log.Child, @@ -190,7 +199,15 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP } for i := range logs { - s.pushToQueue(cid, b.GetTreeId(), &logs[i]) + excPub, err := s.replicateToRemoteNode(ctx, movePair{ + cid: cid, + treeID: b.GetTreeId(), + op: &logs[i], + }, ns, pos) + if err != nil { + return nil, err + } + s.pushToQueue(cid, b.GetTreeId(), &logs[i], excPub) } nodes := make([]uint64, len(logs)) @@ -245,7 +262,15 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + excPub, err := s.replicateToRemoteNode(ctx, movePair{ + cid: cid, + treeID: b.GetTreeId(), + op: log, + }, ns, pos) + if err != nil { + return nil, err + } + s.pushToQueue(cid, b.GetTreeId(), log, excPub) return new(RemoveResponse), nil } @@ -291,7 +316,15 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er return nil, err } - s.pushToQueue(cid, b.GetTreeId(), log) + excPub, err := s.replicateToRemoteNode(ctx, movePair{ + cid: cid, + treeID: b.GetTreeId(), + op: log, + }, ns, pos) + if err != nil { + return nil, err + } + s.pushToQueue(cid, b.GetTreeId(), log, excPub) return new(MoveResponse), nil }