[#9999] tree: Sync replication
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m21s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m43s
DCO action / DCO (pull_request) Successful in 2m47s
Vulncheck / Vulncheck (pull_request) Successful in 2m35s
Tests and linters / gopls check (pull_request) Successful in 3m32s
Tests and linters / Tests (pull_request) Successful in 3m51s
Tests and linters / Staticcheck (pull_request) Successful in 3m50s
Build / Build Components (pull_request) Successful in 4m4s
Tests and linters / Lint (pull_request) Successful in 5m4s
Tests and linters / Tests with -race (pull_request) Successful in 6m15s
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m21s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m43s
DCO action / DCO (pull_request) Successful in 2m47s
Vulncheck / Vulncheck (pull_request) Successful in 2m35s
Tests and linters / gopls check (pull_request) Successful in 3m32s
Tests and linters / Tests (pull_request) Successful in 3m51s
Tests and linters / Staticcheck (pull_request) Successful in 3m50s
Build / Build Components (pull_request) Successful in 4m4s
Tests and linters / Lint (pull_request) Successful in 5m4s
Tests and linters / Tests with -race (pull_request) Successful in 6m15s
Async replication leads to a lot of log undo/redo during background replication. This leads to performance drop to almost zero. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d19ab43500
commit
123bcbdfb8
2 changed files with 10 additions and 8 deletions
|
@ -183,14 +183,15 @@ func (s *Service) replicate(op movePair) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
func (s *Service) pushToQueue(ctx context.Context, cid cidSDK.ID, treeID string, op *pilorama.Move) {
|
||||||
select {
|
select {
|
||||||
case s.replicateCh <- movePair{
|
case s.replicateCh <- movePair{
|
||||||
cid: cid,
|
cid: cid,
|
||||||
treeID: treeID,
|
treeID: treeID,
|
||||||
op: op,
|
op: op,
|
||||||
}:
|
}:
|
||||||
default:
|
case <-s.closeCh:
|
||||||
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ func (s *Service) Add(ctx context.Context, req *AddRequest) (*AddResponse, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
s.pushToQueue(ctx, cid, b.GetTreeId(), log)
|
||||||
return &AddResponse{
|
return &AddResponse{
|
||||||
Body: &AddResponse_Body{
|
Body: &AddResponse_Body{
|
||||||
NodeId: log.Child,
|
NodeId: log.Child,
|
||||||
|
@ -182,7 +182,7 @@ func (s *Service) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByP
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range logs {
|
for i := range logs {
|
||||||
s.pushToQueue(cid, b.GetTreeId(), &logs[i])
|
s.pushToQueue(ctx, cid, b.GetTreeId(), &logs[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := make([]uint64, len(logs))
|
nodes := make([]uint64, len(logs))
|
||||||
|
@ -236,7 +236,7 @@ func (s *Service) Remove(ctx context.Context, req *RemoveRequest) (*RemoveRespon
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
s.pushToQueue(ctx, cid, b.GetTreeId(), log)
|
||||||
return new(RemoveResponse), nil
|
return new(RemoveResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,7 +281,7 @@ func (s *Service) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.pushToQueue(cid, b.GetTreeId(), log)
|
s.pushToQueue(ctx, cid, b.GetTreeId(), log)
|
||||||
return new(MoveResponse), nil
|
return new(MoveResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,7 +586,7 @@ func sortByFilename(nodes []pilorama.NodeInfo, d GetSubTreeRequest_Body_Order_Di
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply locally applies operation from the remote node to the tree.
|
// Apply locally applies operation from the remote node to the tree.
|
||||||
func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
func (s *Service) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
|
||||||
err := verifyMessage(req)
|
err := verifyMessage(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -624,7 +624,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
},
|
},
|
||||||
}:
|
}:
|
||||||
default:
|
case <-s.closeCh:
|
||||||
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue