diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go index db954d90d2..f6a0c4bcb7 100644 --- a/pkg/services/tree/cache.go +++ b/pkg/services/tree/cache.go @@ -3,6 +3,7 @@ package tree import ( "context" "sync" + "time" "github.com/hashicorp/golang-lru/simplelru" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -15,7 +16,10 @@ type clientCache struct { simplelru.LRU } -const defaultClientCacheSize = 10 +const ( + defaultClientCacheSize = 10 + defaultClientConnectTimeout = time.Second * 2 +) func (c *clientCache) init() { l, _ := simplelru.NewLRU(defaultClientCacheSize, func(key, value interface{}) { @@ -55,9 +59,11 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, return nil, err } - cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure()) - if err != nil { - return nil, err - } - return cc, nil + ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout) + cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), + grpc.WithInsecure(), + grpc.WithBlock()) + cancel() + + return cc, err } diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 313f12db7a..09f577c1a1 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -21,22 +21,70 @@ type movePair struct { op *pilorama.LogMove } +type replicationTask struct { + n netmapSDK.NodeInfo + req *ApplyRequest +} + const ( - defaultReplicatorCapacity = 64 - defaultReplicatorTimeout = time.Second * 2 + defaultReplicatorCapacity = 64 + defaultReplicatorWorkerCount = 64 + defaultReplicatorSendTimeout = time.Second * 5 ) -func (s *Service) replicateLoop(ctx context.Context) { +func (s *Service) replicationWorker() { for { select { case <-s.closeCh: + return + case task := <-s.replicationTasks: + var lastErr error + var lastAddr string + + task.n.IterateNetworkEndpoints(func(addr string) bool { + lastAddr = addr + + c, err := s.cache.get(context.Background(), addr) + if err != nil { + lastErr = fmt.Errorf("can't create client: %w", err) + return false + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultReplicatorSendTimeout) + _, lastErr = c.Apply(ctx, task.req) + cancel() + + return lastErr == nil + }) + + if lastErr != nil { + s.log.Warn("failed to sent update to the node", + zap.String("last_error", lastErr.Error()), + zap.String("address", lastAddr), + zap.String("key", hex.EncodeToString(task.n.PublicKey()))) + } + } + } +} + +func (s *Service) replicateLoop(ctx context.Context) { + for i := 0; i < defaultReplicatorWorkerCount; i++ { + go s.replicationWorker() + } + defer func() { + for len(s.replicationTasks) != 0 { + <-s.replicationTasks + } + }() + + for { + select { + case <-s.closeCh: + return case <-ctx.Done(): return case op := <-s.replicateCh: - ctx, cancel := context.WithTimeout(ctx, defaultReplicatorTimeout) - err := s.replicate(ctx, op) - cancel() - + err := s.replicate(op) if err != nil { s.log.Error("error during replication", zap.String("err", err.Error()), @@ -47,7 +95,7 @@ func (s *Service) replicateLoop(ctx context.Context) { } } -func (s *Service) replicate(ctx context.Context, op movePair) error { +func (s *Service) replicate(op movePair) error { req := newApplyRequest(&op) err := signMessage(req, s.key) if err != nil { @@ -64,37 +112,19 @@ func (s *Service) replicate(ctx context.Context, op movePair) error { continue } - var lastErr error - var lastAddr string - - n.IterateNetworkEndpoints(func(addr string) bool { - lastAddr = addr - - c, err := s.cache.get(ctx, addr) - if err != nil { - lastErr = err - return false - } - - _, lastErr = c.Apply(ctx, req) - return lastErr == nil - }) - - if lastErr != nil { - s.log.Warn("failed to sent update to the node", - zap.String("last_error", lastErr.Error()), - zap.String("address", lastAddr), - zap.String("key", hex.EncodeToString(n.PublicKey()))) - } + s.replicationTasks <- replicationTask{n, req} } return nil } func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove) { - s.replicateCh <- movePair{ + select { + case s.replicateCh <- movePair{ cid: cid, treeID: treeID, op: op, + }: + case <-s.closeCh: } } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index af09d60634..af71569bd8 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -18,9 +18,10 @@ import ( type Service struct { cfg - cache clientCache - replicateCh chan movePair - closeCh chan struct{} + cache clientCache + replicateCh chan movePair + replicationTasks chan replicationTask + closeCh chan struct{} } // MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC. @@ -42,6 +43,7 @@ func New(opts ...Option) *Service { s.cache.init() s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, defaultReplicatorCapacity) + s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount) return &s }