[#1427] services/tree: Parallelize replicator

Before this commit the replication channel was quickly filled under
heavy load. This lead to the continuously increasing latency for all
write operations. Now it looks better.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-28 16:48:25 +03:00 committed by fyrchik
parent 8027b7bb6b
commit 33d8fb187a
3 changed files with 78 additions and 40 deletions

View file

@ -3,6 +3,7 @@ package tree
import (
"context"
"sync"
"time"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/network"
@ -15,7 +16,10 @@ type clientCache struct {
simplelru.LRU
}
const defaultClientCacheSize = 10
const (
defaultClientCacheSize = 10
defaultClientConnectTimeout = time.Second * 2
)
func (c *clientCache) init() {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(key, value interface{}) {
@ -55,9 +59,11 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn,
return nil, err
}
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
if err != nil {
return nil, err
}
return cc, nil
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(),
grpc.WithInsecure(),
grpc.WithBlock())
cancel()
return cc, err
}

View file

@ -21,22 +21,70 @@ type movePair struct {
op *pilorama.LogMove
}
type replicationTask struct {
n netmapSDK.NodeInfo
req *ApplyRequest
}
const (
defaultReplicatorCapacity = 64
defaultReplicatorTimeout = time.Second * 2
defaultReplicatorCapacity = 64
defaultReplicatorWorkerCount = 64
defaultReplicatorSendTimeout = time.Second * 5
)
func (s *Service) replicateLoop(ctx context.Context) {
func (s *Service) replicationWorker() {
for {
select {
case <-s.closeCh:
return
case task := <-s.replicationTasks:
var lastErr error
var lastAddr string
task.n.IterateNetworkEndpoints(func(addr string) bool {
lastAddr = addr
c, err := s.cache.get(context.Background(), addr)
if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err)
return false
}
ctx, cancel := context.WithTimeout(context.Background(), defaultReplicatorSendTimeout)
_, lastErr = c.Apply(ctx, task.req)
cancel()
return lastErr == nil
})
if lastErr != nil {
s.log.Warn("failed to sent update to the node",
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(task.n.PublicKey())))
}
}
}
}
func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < defaultReplicatorWorkerCount; i++ {
go s.replicationWorker()
}
defer func() {
for len(s.replicationTasks) != 0 {
<-s.replicationTasks
}
}()
for {
select {
case <-s.closeCh:
return
case <-ctx.Done():
return
case op := <-s.replicateCh:
ctx, cancel := context.WithTimeout(ctx, defaultReplicatorTimeout)
err := s.replicate(ctx, op)
cancel()
err := s.replicate(op)
if err != nil {
s.log.Error("error during replication",
zap.String("err", err.Error()),
@ -47,7 +95,7 @@ func (s *Service) replicateLoop(ctx context.Context) {
}
}
func (s *Service) replicate(ctx context.Context, op movePair) error {
func (s *Service) replicate(op movePair) error {
req := newApplyRequest(&op)
err := signMessage(req, s.key)
if err != nil {
@ -64,37 +112,19 @@ func (s *Service) replicate(ctx context.Context, op movePair) error {
continue
}
var lastErr error
var lastAddr string
n.IterateNetworkEndpoints(func(addr string) bool {
lastAddr = addr
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = err
return false
}
_, lastErr = c.Apply(ctx, req)
return lastErr == nil
})
if lastErr != nil {
s.log.Warn("failed to sent update to the node",
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(n.PublicKey())))
}
s.replicationTasks <- replicationTask{n, req}
}
return nil
}
func (s *Service) pushToQueue(cid cidSDK.ID, treeID string, op *pilorama.LogMove) {
s.replicateCh <- movePair{
select {
case s.replicateCh <- movePair{
cid: cid,
treeID: treeID,
op: op,
}:
case <-s.closeCh:
}
}

View file

@ -18,9 +18,10 @@ import (
type Service struct {
cfg
cache clientCache
replicateCh chan movePair
closeCh chan struct{}
cache clientCache
replicateCh chan movePair
replicationTasks chan replicationTask
closeCh chan struct{}
}
// MaxGetSubTreeDepth represents maximum allowed traversal depth in GetSubTree RPC.
@ -42,6 +43,7 @@ func New(opts ...Option) *Service {
s.cache.init()
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
s.replicationTasks = make(chan replicationTask, defaultReplicatorWorkerCount)
return &s
}