Move diff from nspcc master and support branches #28

Merged
fyrchik merged 28 commits from move-changes into master 2023-01-25 12:31:47 +00:00
2 changed files with 34 additions and 7 deletions
Showing only changes of commit 18a18c9727 - Show all commits

View file

@ -25,12 +25,33 @@ type replicationTask struct {
req *ApplyRequest req *ApplyRequest
} }
type applyOp struct {
treeID string
pilorama.CIDDescriptor
pilorama.Move
}
const ( const (
defaultReplicatorCapacity = 64 defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64 defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5 defaultReplicatorSendTimeout = time.Second * 5
) )
func (s *Service) localReplicationWorker() {
for {
select {
case <-s.closeCh:
return
case op := <-s.replicateLocalCh:
err := s.forest.TreeApply(op.CIDDescriptor, op.treeID, &op.Move, false)
if err != nil {
s.log.Error("failed to apply replicated operation",
zap.String("err", err.Error()))
}
}
}
}
func (s *Service) replicationWorker() { func (s *Service) replicationWorker() {
for { for {
select { select {
@ -74,6 +95,7 @@ func (s *Service) replicationWorker() {
func (s *Service) replicateLoop(ctx context.Context) { func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ { for i := 0; i < s.replicatorWorkerCount; i++ {
go s.replicationWorker() go s.replicationWorker()
go s.localReplicationWorker()
} }
defer func() { defer func() {
for len(s.replicationTasks) != 0 { for len(s.replicationTasks) != 0 {

View file

@ -23,6 +23,7 @@ type Service struct {
cache clientCache cache clientCache
replicateCh chan movePair replicateCh chan movePair
replicateLocalCh chan applyOp
replicationTasks chan replicationTask replicationTasks chan replicationTask
closeCh chan struct{} closeCh chan struct{}
containerCache containerCache containerCache containerCache
@ -59,6 +60,7 @@ func New(opts ...Option) *Service {
s.cache.init() s.cache.init()
s.closeCh = make(chan struct{}) s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount) s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize) s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64) s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
@ -483,13 +485,16 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
return nil, fmt.Errorf("can't parse meta-information: %w", err) return nil, fmt.Errorf("can't parse meta-information: %w", err)
} }
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} s.replicateLocalCh <- applyOp{
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}} treeID: req.GetBody().GetTreeId(),
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ CIDDescriptor: pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size},
Parent: op.GetParentId(), Move: pilorama.Move{
Child: op.GetChildId(), Parent: op.GetParentId(),
Meta: meta, Child: op.GetChildId(),
}, false) Meta: meta,
},
}
return &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}, nil
} }
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {