[#1374] tree: Use NewClient to create grpc connection in cache

Created grpc connection should be established, so perform Healthcheck request
to check connection is ok.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-09-13 11:59:17 +03:00
parent 54fe8383a4
commit 89d0435b1d
2 changed files with 26 additions and 9 deletions

View file

@ -2,6 +2,7 @@ package tree
import ( import (
"context" "context"
"crypto/ecdsa"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
@ -19,6 +20,7 @@ import (
type clientCache struct { type clientCache struct {
sync.Mutex sync.Mutex
simplelru.LRU[string, cacheItem] simplelru.LRU[string, cacheItem]
key *ecdsa.PrivateKey
} }
type cacheItem struct { type cacheItem struct {
@ -34,13 +36,14 @@ const (
var errRecentlyFailed = errors.New("client has recently failed") var errRecentlyFailed = errors.New("client has recently failed")
func (c *clientCache) init() { func (c *clientCache) init(pk *ecdsa.PrivateKey) {
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) { l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
if conn := value.cc; conn != nil { if conn := value.cc; conn != nil {
_ = conn.Close() _ = conn.Close()
} }
}) })
c.LRU = *l c.LRU = *l
c.key = pk
} }
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
@ -63,7 +66,7 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
} }
} }
cc, err := dialTreeService(ctx, netmapAddr) cc, err := c.dialTreeService(ctx, netmapAddr)
lastTry := time.Now() lastTry := time.Now()
c.Lock() c.Lock()
@ -81,14 +84,13 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl
return NewTreeServiceClient(cc), nil return NewTreeServiceClient(cc), nil
} }
func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) { func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
var netAddr network.Address var netAddr network.Address
if err := netAddr.FromString(netmapAddr); err != nil { if err := netAddr.FromString(netmapAddr); err != nil {
return nil, err return nil, err
} }
opts := []grpc.DialOption{ opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithChainUnaryInterceptor( grpc.WithChainUnaryInterceptor(
metrics.NewUnaryClientInterceptor(), metrics.NewUnaryClientInterceptor(),
tracing.NewUnaryClientInteceptor(), tracing.NewUnaryClientInteceptor(),
@ -103,9 +105,24 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn,
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} }
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout) req := &HealthcheckRequest{
cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...) Body: &HealthcheckRequest_Body{},
cancel() }
if err := SignMessage(req, c.key); err != nil {
return cc, err return nil, err
}
cc, err := grpc.NewClient(netAddr.URIAddr(), opts...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
defer cancel()
// perform some request to check connection
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
_ = cc.Close()
return nil, err
}
return cc, nil
} }

View file

@ -65,7 +65,7 @@ func New(opts ...Option) *Service {
s.log = &logger.Logger{Logger: zap.NewNop()} s.log = &logger.Logger{Logger: zap.NewNop()}
} }
s.cache.init() s.cache.init(s.key)
s.closeCh = make(chan struct{}) s.closeCh = make(chan struct{})
s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity)
s.replicateLocalCh = make(chan applyOp) s.replicateLocalCh = make(chan applyOp)