[#1446] services/tree: Cache connections to the container nodes

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-05-28 12:27:55 +03:00 committed by fyrchik
parent 4437cd7113
commit b04f712773
4 changed files with 69 additions and 18 deletions

View file

@ -0,0 +1,63 @@
package tree
import (
"context"
"sync"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/network"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
type clientCache struct {
sync.Mutex
simplelru.LRU
}
const defaultClientCacheSize = 10
func (c *clientCache) init() {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(key, value interface{}) {
_ = value.(*grpc.ClientConn).Close()
})
c.LRU = *l
}
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
c.Lock()
ccInt, ok := c.LRU.Get(netmapAddr)
c.Unlock()
if ok {
cc := ccInt.(*grpc.ClientConn)
if s := cc.GetState(); s == connectivity.Idle || s == connectivity.Ready {
return NewTreeServiceClient(cc), nil
}
_ = cc.Close()
}
cc, err := dialTreeService(ctx, netmapAddr)
if err != nil {
return nil, err
}
c.Lock()
c.LRU.Add(netmapAddr, cc)
c.Unlock()
return NewTreeServiceClient(cc), nil
}
func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
var netAddr network.Address
if err := netAddr.FromString(netmapAddr); err != nil {
return nil, err
}
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
if err != nil {
return nil, err
}
return cc, nil
}

View file

@ -5,10 +5,8 @@ import (
"context"
"errors"
"github.com/nspcc-dev/neofs-node/pkg/network"
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
"google.golang.org/grpc"
)
var errNoSuitableNode = errors.New("no node was found to execute the request")
@ -26,7 +24,7 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
for _, n := range cntNodes {
var stop bool
n.IterateNetworkEndpoints(func(endpoint string) bool {
c, err := dialTreeService(ctx, endpoint)
c, err := s.cache.get(ctx, endpoint)
if err != nil {
return false
}
@ -45,16 +43,3 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
}
return nil
}
func dialTreeService(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
var netAddr network.Address
if err := netAddr.FromString(netmapAddr); err != nil {
return nil, err
}
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
if err != nil {
return nil, err
}
return NewTreeServiceClient(cc), nil
}

View file

@ -70,7 +70,7 @@ func (s *Service) replicate(ctx context.Context, op movePair) error {
n.IterateNetworkEndpoints(func(addr string) bool {
lastAddr = addr
c, err := dialTreeService(ctx, addr)
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = err
return false

View file

@ -18,6 +18,7 @@ import (
type Service struct {
cfg
cache clientCache
replicateCh chan movePair
closeCh chan struct{}
}
@ -38,6 +39,7 @@ func New(opts ...Option) *Service {
s.log = zap.NewNop()
}
s.cache.init()
s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
@ -424,7 +426,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
}
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
return nil, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,