They connect to the same endpoints, the only difference is that connection for synchronization is limited in lifetime and is closed after the sync is finished. This is probably not intentional, as synchronization was implemented before cache was introduced. However, reusing dialTreeService() in sync.go has possible perfomance implications, so is avoided for now. Change-Id: I2e37befd783b4d873ff833969f932deded1195be Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
114 lines
2.6 KiB
Go
114 lines
2.6 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
)
|
|
|
|
type clientCache struct {
|
|
sync.Mutex
|
|
simplelru.LRU[string, cacheItem]
|
|
key *ecdsa.PrivateKey
|
|
ds *internalNet.DialerSource
|
|
}
|
|
|
|
type cacheItem struct {
|
|
cc *grpc.ClientConn
|
|
lastTry time.Time
|
|
}
|
|
|
|
const (
|
|
defaultClientCacheSize = 32
|
|
defaultClientConnectTimeout = time.Second * 2
|
|
defaultReconnectInterval = time.Second * 15
|
|
)
|
|
|
|
var errRecentlyFailed = errors.New("client has recently failed")
|
|
|
|
func (c *clientCache) init(pk *ecdsa.PrivateKey, ds *internalNet.DialerSource) {
|
|
l, _ := simplelru.NewLRU(defaultClientCacheSize, func(_ string, value cacheItem) {
|
|
if conn := value.cc; conn != nil {
|
|
_ = conn.Close()
|
|
}
|
|
})
|
|
c.LRU = *l
|
|
c.key = pk
|
|
c.ds = ds
|
|
}
|
|
|
|
func (c *clientCache) get(ctx context.Context, netmapAddr string) (TreeServiceClient, error) {
|
|
c.Lock()
|
|
ccInt, ok := c.LRU.Get(netmapAddr)
|
|
c.Unlock()
|
|
|
|
if ok {
|
|
item := ccInt
|
|
if item.cc == nil {
|
|
if d := time.Since(item.lastTry); d < defaultReconnectInterval {
|
|
return nil, fmt.Errorf("%w: %s till the next reconnection to %s",
|
|
errRecentlyFailed, d, netmapAddr)
|
|
}
|
|
} else {
|
|
if s := item.cc.GetState(); s == connectivity.Idle || s == connectivity.Ready {
|
|
return NewTreeServiceClient(item.cc), nil
|
|
}
|
|
_ = item.cc.Close()
|
|
}
|
|
}
|
|
|
|
cc, err := c.dialTreeService(ctx, netmapAddr)
|
|
lastTry := time.Now()
|
|
|
|
c.Lock()
|
|
if err != nil {
|
|
c.LRU.Add(netmapAddr, cacheItem{cc: nil, lastTry: lastTry})
|
|
} else {
|
|
c.LRU.Add(netmapAddr, cacheItem{cc: cc, lastTry: lastTry})
|
|
}
|
|
c.Unlock()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewTreeServiceClient(cc), nil
|
|
}
|
|
|
|
func (c *clientCache) dialTreeService(ctx context.Context, netmapAddr string) (*grpc.ClientConn, error) {
|
|
var netAddr network.Address
|
|
if err := netAddr.FromString(netmapAddr); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cc, err := createConnection(netAddr, grpc.WithContextDialer(c.ds.GrpcContextDialer()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, defaultClientConnectTimeout)
|
|
defer cancel()
|
|
|
|
req := &HealthcheckRequest{
|
|
Body: &HealthcheckRequest_Body{},
|
|
}
|
|
if err := SignMessage(req, c.key); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// perform some request to check connection
|
|
if _, err := NewTreeServiceClient(cc).Healthcheck(ctx, req); err != nil {
|
|
_ = cc.Close()
|
|
return nil, err
|
|
}
|
|
return cc, nil
|
|
}
|