From b04f712773458c5856a4b28ed725662b2510296a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Sat, 28 May 2022 12:27:55 +0300 Subject: [PATCH] [#1446] services/tree: Cache connections to the container nodes Signed-off-by: Evgenii Stratonikov --- pkg/services/tree/cache.go | 63 +++++++++++++++++++++++++++++++++ pkg/services/tree/redirect.go | 17 +-------- pkg/services/tree/replicator.go | 2 +- pkg/services/tree/service.go | 5 ++- 4 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 pkg/services/tree/cache.go diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go new file mode 100644 index 0000000000..db954d90d2 --- /dev/null +++ b/pkg/services/tree/cache.go @@ -0,0 +1,63 @@ +package tree + +import ( + "context" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/nspcc-dev/neofs-node/pkg/network" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +type clientCache struct { + sync.Mutex + simplelru.LRU +} + +const defaultClientCacheSize = 10 + +func (c *clientCache) init() { + l, _ := simplelru.NewLRU(defaultClientCacheSize, func(key, value interface{}) { + _ = value.(*grpc.ClientConn).Close() + }) + c.LRU = *l +} + +func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { + c.Lock() + ccInt, ok := c.LRU.Get(netmapAddr) + c.Unlock() + + if ok { + cc := ccInt.(*grpc.ClientConn) + if s := cc.GetState(); s == connectivity.Idle || s == connectivity.Ready { + return NewTreeServiceClient(cc), nil + } + _ = cc.Close() + } + + cc, err := dialTreeService(ctx, netmapAddr) + if err != nil { + return nil, err + } + + c.Lock() + c.LRU.Add(netmapAddr, cc) + c.Unlock() + + return NewTreeServiceClient(cc), nil +} + +func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) { + var netAddr network.Address + if err := netAddr.FromString(netmapAddr); err != nil { + return nil, err + } + + cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure()) + if err != nil { + return nil, err + } + return cc, nil +} diff --git a/pkg/services/tree/redirect.go b/pkg/services/tree/redirect.go index 304ed0d7d4..52348fb079 100644 --- a/pkg/services/tree/redirect.go +++ b/pkg/services/tree/redirect.go @@ -5,10 +5,8 @@ import ( "context" "errors" - "github.com/nspcc-dev/neofs-node/pkg/network" netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" - "google.golang.org/grpc" ) var errNoSuitableNode = errors.New("no node was found to execute the request") @@ -26,7 +24,7 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo for _, n := range cntNodes { var stop bool n.IterateNetworkEndpoints(func(endpoint string) bool { - c, err := dialTreeService(ctx, endpoint) + c, err := s.cache.get(ctx, endpoint) if err != nil { return false } @@ -45,16 +43,3 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo } return nil } - -func dialTreeService(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { - var netAddr network.Address - if err := netAddr.FromString(netmapAddr); err != nil { - return nil, err - } - - cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure()) - if err != nil { - return nil, err - } - return NewTreeServiceClient(cc), nil -} diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 15ef0b032e..313f12db7a 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -70,7 +70,7 @@ func (s *Service) replicate(ctx context.Context, op movePair) error { n.IterateNetworkEndpoints(func(addr string) bool { lastAddr = addr - c, err := dialTreeService(ctx, addr) + c, err := s.cache.get(ctx, addr) if err != nil { lastErr = err return false diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index b0d86ce10a..af09d60634 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -18,6 +18,7 @@ import ( type Service struct { cfg + cache clientCache replicateCh chan movePair closeCh chan struct{} } @@ -38,6 +39,7 @@ func New(opts ...Option) *Service { s.log = zap.NewNop() } + s.cache.init() s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, defaultReplicatorCapacity) @@ -424,7 +426,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e } d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} - return nil, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ + resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}} + return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ Parent: op.GetParentId(), Child: op.GetChildId(), Meta: meta,