[#1611] tree: Replicate operation
at least on one remote node
All checks were successful
Build / Build Components (push) Successful in 1m38s
Pre-commit hooks / Pre-commit (push) Successful in 1m58s
Tests and linters / Run gofumpt (push) Successful in 2m7s
Tests and linters / Staticcheck (push) Successful in 2m26s
Vulncheck / Vulncheck (push) Successful in 2m22s
Tests and linters / Tests (push) Successful in 2m40s
Tests and linters / Lint (push) Successful in 3m5s
Tests and linters / Tests with -race (push) Successful in 3m6s
Tests and linters / gopls check (push) Successful in 4m36s
OCI image / Build container images (push) Successful in 4m52s
All checks were successful
Build / Build Components (push) Successful in 1m38s
Pre-commit hooks / Pre-commit (push) Successful in 1m58s
Tests and linters / Run gofumpt (push) Successful in 2m7s
Tests and linters / Staticcheck (push) Successful in 2m26s
Vulncheck / Vulncheck (push) Successful in 2m22s
Tests and linters / Tests (push) Successful in 2m40s
Tests and linters / Lint (push) Successful in 3m5s
Tests and linters / Tests with -race (push) Successful in 3m6s
Tests and linters / gopls check (push) Successful in 4m36s
OCI image / Build container images (push) Successful in 4m52s
These changes prevent possible DU when `pilorama` became unavailable on the node. Change-Id: I6fec6c2ea20b0c59f6c35c1da5da4f70865fac4b Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
b01f05c951
commit
db5b5a5d19
2 changed files with 70 additions and 6 deletions
|
@ -1,6 +1,7 @@
|
||||||
package tree
|
package tree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
@ -22,6 +23,7 @@ type movePair struct {
|
||||||
cid cidSDK.ID
|
cid cidSDK.ID
|
||||||
treeID string
|
treeID string
|
||||||
op *pilorama.Move
|
op *pilorama.Move
|
||||||
|
excPub []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type replicationTask struct {
|
type replicationTask struct {
|
||||||
|
@ -178,19 +180,48 @@ func (s *Service) replicate(ctx context.Context, op movePair) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
if i != localIndex {
|
if i != localIndex && !bytes.Equal(nodes[i].PublicKey(), op.excPub) {
|
||||||
s.replicationTasks <- replicationTask{nodes[i], req}
|
s.replicationTasks <- replicationTask{nodes[i], req}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
func (s *Service) replicateToRemoteNode(ctx context.Context, op movePair,
|
||||||
|
nodes []netmapSDK.NodeInfo, localIndex int,
|
||||||
|
) ([]byte, error) {
|
||||||
|
req := newApplyRequest(&op)
|
||||||
|
err := SignMessage(req, s.key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("can't sign data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var errMulti error
|
||||||
|
for i := range nodes {
|
||||||
|
if i != localIndex {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
err := s.ReplicateTreeOp(ctx, nodes[i], req)
|
||||||
|
if err != nil {
|
||||||
|
errMulti = errors.Join(errMulti, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nodes[i].PublicKey(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, errMulti
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move, excPub []byte) {
|
||||||
select {
|
select {
|
||||||
case s.replicateCh <- movePair{
|
case s.replicateCh <- movePair{
|
||||||
cid: cid,
|
cid: cid,
|
||||||
treeID: treeID,
|
treeID: treeID,
|
||||||
op: op,
|
op: op,
|
||||||
|
excPub: excPub,
|
||||||
}:
|
}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,7 +142,16 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
excPub, err := s.replicateToRemoteNode(ctx, movePair{
|
||||||
|
cid: cid,
|
||||||
|
treeID: b.GetTreeId(),
|
||||||
|
op: log,
|
||||||
|
}, ns, pos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.pushToQueue(cid, b.GetTreeId(), log, excPub)
|
||||||
|
|
||||||
return &AddResponse{
|
return &AddResponse{
|
||||||
Body: &AddResponse_Body{
|
Body: &AddResponse_Body{
|
||||||
NodeId: log.Child,
|
NodeId: log.Child,
|
||||||
|
@ -190,7 +199,15 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range logs {
|
for i := range logs {
|
||||||
s.pushToQueue(cid, b.GetTreeId(), &logs[i])
|
excPub, err := s.replicateToRemoteNode(ctx, movePair{
|
||||||
|
cid: cid,
|
||||||
|
treeID: b.GetTreeId(),
|
||||||
|
op: &logs[i],
|
||||||
|
}, ns, pos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.pushToQueue(cid, b.GetTreeId(), &logs[i], excPub)
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := make([]uint64, len(logs))
|
nodes := make([]uint64, len(logs))
|
||||||
|
@ -245,7 +262,15 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
excPub, err := s.replicateToRemoteNode(ctx, movePair{
|
||||||
|
cid: cid,
|
||||||
|
treeID: b.GetTreeId(),
|
||||||
|
op: log,
|
||||||
|
}, ns, pos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.pushToQueue(cid, b.GetTreeId(), log, excPub)
|
||||||
return new(RemoveResponse), nil
|
return new(RemoveResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,7 +316,15 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
excPub, err := s.replicateToRemoteNode(ctx, movePair{
|
||||||
|
cid: cid,
|
||||||
|
treeID: b.GetTreeId(),
|
||||||
|
op: log,
|
||||||
|
}, ns, pos)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.pushToQueue(cid, b.GetTreeId(), log, excPub)
|
||||||
return new(MoveResponse), nil
|
return new(MoveResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue