From 89d0435b1d59257d5bf15c926465193e53c11922 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 13 Sep 2024 11:59:17 +0300 Subject: [PATCH] [#1374] tree: Use NewClient to create grpc connection in cache Created grpc connection should be established, so perform Healthcheck request to check connection is ok. Signed-off-by: Dmitrii Stepanov --- pkg/services/tree/cache.go | 33 +++++++++++++++++++++++++-------- pkg/services/tree/service.go | 2 +- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/services/tree/cache.go b/pkg/services/tree/cache.go index 1be1c2f8..38501b85 100644 --- a/pkg/services/tree/cache.go +++ b/pkg/services/tree/cache.go @@ -2,6 +2,7 @@ package tree import ( "context" + "crypto/ecdsa" "errors" "fmt" "sync" @@ -19,6 +20,7 @@ import ( type clientCache struct { sync.Mutex simplelru.LRU[string, cacheItem] + key *ecdsa.PrivateKey } type cacheItem struct { @@ -34,13 +36,14 @@ const ( var errRecentlyFailed = errors.New("client has recently failed") -func (c *clientCache) init() { +func (c *clientCache) init(pk *ecdsa.PrivateKey) { l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) { if conn := value.cc; conn != nil { _ = conn.Close() } }) c.LRU = *l + c.key = pk } func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) { @@ -63,7 +66,7 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl } } - cc, err := dialTreeService(ctx, netmapAddr) + cc, err := c.dialTreeService(ctx, netmapAddr) lastTry := time.Now() c.Lock() @@ -81,14 +84,13 @@ func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceCl return NewTreeServiceClient(cc), nil } -func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) { +func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) { var netAddr network.Address if err := netAddr.FromString(netmapAddr); err != nil { return nil, err } opts := []grpc.DialOption{ - grpc.WithBlock(), grpc.WithChainUnaryInterceptor( metrics.NewUnaryClientInterceptor(), tracing.NewUnaryClientInteceptor(), @@ -103,9 +105,24 @@ func dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout) - cc, err := grpc.DialContext(ctx, netAddr.URIAddr(), opts...) - cancel() + req := &HealthcheckRequest{ + Body: &HealthcheckRequest_Body{}, + } + if err := SignMessage(req, c.key); err != nil { + return nil, err + } - return cc, err + cc, err := grpc.NewClient(netAddr.URIAddr(), opts...) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout) + defer cancel() + // perform some request to check connection + if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil { + _ = cc.Close() + return nil, err + } + return cc, nil } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 875e47ec..60bb1a6a 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -65,7 +65,7 @@ func New(opts ...Option) *Service { s.log = &logger.Logger{Logger: zap.NewNop()} } - s.cache.init() + s.cache.init(s.key) s.closeCh = make(chan struct{}) s.replicateCh = make(chan movePair, s.replicatorChannelCapacity) s.replicateLocalCh = make(chan applyOp)