forked from TrueCloudLab/frostfs-node
[#1446] services/tree: Cache connections to the container nodes
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
06f2681178
commit
c9ddc8fbeb
4 changed files with 69 additions and 18 deletions
63
pkg/services/tree/cache.go
Normal file
63
pkg/services/tree/cache.go
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/golang-lru/simplelru"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
)
|
||||||
|
|
||||||
|
type clientCache struct {
|
||||||
|
sync.Mutex
|
||||||
|
simplelru.LRU
|
||||||
|
}
|
||||||
|
|
||||||
|
const defaultClientCacheSize = 10
|
||||||
|
|
||||||
|
func (c *clientCache) init() {
|
||||||
|
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(key, value interface{}) {
|
||||||
|
_ = value.(*grpc.ClientConn).Close()
|
||||||
|
})
|
||||||
|
c.LRU = *l
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
||||||
|
c.Lock()
|
||||||
|
ccInt, ok := c.LRU.Get(netmapAddr)
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
cc := ccInt.(*grpc.ClientConn)
|
||||||
|
if s := cc.GetState(); s == connectivity.Idle || s == connectivity.Ready {
|
||||||
|
return NewTreeServiceClient(cc), nil
|
||||||
|
}
|
||||||
|
_ = cc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err := dialTreeService(ctx, netmapAddr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Lock()
|
||||||
|
c.LRU.Add(netmapAddr, cc)
|
||||||
|
c.Unlock()
|
||||||
|
|
||||||
|
return NewTreeServiceClient(cc), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
|
||||||
|
var netAddr network.Address
|
||||||
|
if err := netAddr.FromString(netmapAddr); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cc, nil
|
||||||
|
}
|
|
@ -5,10 +5,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
netmapSDK "github.com/nspcc-dev/neofs-sdk-go/netmap"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
var errNoSuitableNode = errors.New("no node was found to execute the request")
|
||||||
|
@ -26,7 +24,7 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
|
||||||
for _, n := range cntNodes {
|
for _, n := range cntNodes {
|
||||||
var stop bool
|
var stop bool
|
||||||
n.IterateNetworkEndpoints(func(endpoint string) bool {
|
n.IterateNetworkEndpoints(func(endpoint string) bool {
|
||||||
c, err := dialTreeService(ctx, endpoint)
|
c, err := s.cache.get(ctx, endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -45,16 +43,3 @@ func (s *Service) forEachNode(ctx context.Context, cntNodes []netmapSDK.NodeInfo
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func dialTreeService(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
|
||||||
var netAddr network.Address
|
|
||||||
if err := netAddr.FromString(netmapAddr); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), grpc.WithInsecure())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return NewTreeServiceClient(cc), nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ func (s *Service) replicate(ctx context.Context, op movePair) error {
|
||||||
n.IterateNetworkEndpoints(func(addr string) bool {
|
n.IterateNetworkEndpoints(func(addr string) bool {
|
||||||
lastAddr = addr
|
lastAddr = addr
|
||||||
|
|
||||||
c, err := dialTreeService(ctx, addr)
|
c, err := s.cache.get(ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lastErr = err
|
lastErr = err
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
type Service struct {
|
type Service struct {
|
||||||
cfg
|
cfg
|
||||||
|
|
||||||
|
cache clientCache
|
||||||
replicateCh chan movePair
|
replicateCh chan movePair
|
||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -38,6 +39,7 @@ func New(opts ...Option) *Service {
|
||||||
s.log = zap.NewNop()
|
s.log = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.cache.init()
|
||||||
s.closeCh = make(chan struct{})
|
s.closeCh = make(chan struct{})
|
||||||
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
|
s.replicateCh = make(chan movePair, defaultReplicatorCapacity)
|
||||||
|
|
||||||
|
@ -424,7 +426,8 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
|
||||||
}
|
}
|
||||||
|
|
||||||
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
|
||||||
return nil, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
|
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
|
||||||
|
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
|
||||||
Parent: op.GetParentId(),
|
Parent: op.GetParentId(),
|
||||||
Child: op.GetChildId(),
|
Child: op.GetChildId(),
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
|
|
Loading…
Reference in a new issue