From 6efa93be0a014220d516ef874ee42ddd748460ac Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 26 Jul 2022 13:37:12 +0300 Subject: [PATCH] [#1621] services/tree: Return `Apply` result asyncronously Signed-off-by: Evgenii Stratonikov Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/replicator.go | 22 ++++++++++++++++++++++ pkg/services/tree/service.go | 19 ++++++++++++------- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index e7cf1508..4aa1d710 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -25,12 +25,33 @@ type replicationTask struct { req *ApplyRequest } +type applyOp struct { + treeID string + pilorama.CIDDescriptor + pilorama.Move +} + const ( defaultReplicatorCapacity = 64 defaultReplicatorWorkerCount = 64 defaultReplicatorSendTimeout = time.Second * 5 ) +func (s *Service) localReplicationWorker() { + for { + select { + case <-s.closeCh: + return + case op := <-s.replicateLocalCh: + err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false) + if err != nil { + s.log.Error("failed to apply replicated operation", + zap.String("err", err.Error())) + } + } + } +} + func (s *Service) replicationWorker() { for { select { @@ -74,6 +95,7 @@ func (s *Service) replicationWorker() { func (s *Service) replicateLoop(ctx context.Context) { for i := 0; i < s.replicatorWorkerCount; i++ { go s.replicationWorker() + go s.localReplicationWorker() } defer func() { for len(s.replicationTasks) != 0 { diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 9a78b6e3..440165be 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -23,6 +23,7 @@ type Service struct { cache clientCache replicateCh chan movePair + replicateLocalCh chan applyOp replicationTasks chan replicationTask closeCh chan struct{} containerCache containerCache @@ -59,6 +60,7 @@ func New(opts ...Option) *Service { s.cache.init() s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) + s.replicateLocalCh = make(chan applyOp) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.containerCache.init(s.containerCacheSize) s.cnrMap = make(map[cidSDK.ID]map[string]uint64) @@ -483,13 +485,16 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e return nil, fmt.Errorf("can't parse meta-information: %w", err) } - d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} - resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}} - return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ - Parent: op.GetParentId(), - Child: op.GetChildId(), - Meta: meta, - }, false) + s.replicateLocalCh <- applyOp{ + treeID: req.GetBody().GetTreeId(), + CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}, + Move: pilorama.Move{ + Parent: op.GetParentId(), + Child: op.GetChildId(), + Meta: meta, + }, + } + return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {