[#1621] services/tree: Return Apply result asyncronously

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2022-07-26 13:37:12 +03:00 committed by fyrchik
parent ac81c70c09
commit 6efa93be0a
2 changed files with 34 additions and 7 deletions

View file

@ -25,12 +25,33 @@ type replicationTask struct {
req *ApplyRequest
}
type applyOp struct {
treeID string
pilorama.CIDDescriptor
pilorama.Move
}
const (
defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5
)
func (s *Service) localReplicationWorker() {
for {
select {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
if err != nil {
s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error()))
}
}
}
}
func (s *Service) replicationWorker() {
for {
select {
@ -74,6 +95,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ {
go s.replicationWorker()
go s.localReplicationWorker()
}
defer func() {
for len(s.replicationTasks) != 0 {

View file

@ -23,6 +23,7 @@ type Service struct {
cache clientCache
replicateCh chan movePair
replicateLocalCh chan applyOp
replicationTasks chan replicationTask
closeCh chan struct{}
containerCache containerCache
@ -59,6 +60,7 @@ func New(opts ...Option) *Service {
s.cache.init()
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
@ -483,13 +485,16 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
return nil, fmt.Errorf("can't parse meta-information: %w", err)
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
s.replicateLocalCh <- applyOp{
treeID: req.GetBody().GetTreeId(),
CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
Move: pilorama.Move{
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,
}, false)
},
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {