package tree import ( "context" "crypto/tls" "errors" "fmt" "sync" "time" rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc" rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" "google.golang.org/grpc" ) type treeClient struct { mu sync.RWMutex address string opts []grpc.DialOption client *rpcclient.Client nodeDialTimeout time.Duration streamTimeout time.Duration healthy bool } // ErrUnhealthyEndpoint is returned when client in the pool considered unavailable. var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint") // newTreeClient creates new tree client with auto dial. func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient { return &treeClient{ address: addr, opts: opts, nodeDialTimeout: dialTimeout, streamTimeout: streamTimeout, } } func (c *treeClient) dial(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() if c.client != nil { return fmt.Errorf("couldn't dial '%s': connection already established", c.address) } var err error c.client, err = c.createClient() if err != nil { return err } if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil { return fmt.Errorf("healthcheck tree service: %w", err) } c.healthy = true return nil } func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bool, err error) { c.mu.Lock() defer c.mu.Unlock() if c.client == nil { if c.client, err = c.createClient(); err != nil { return false, err } } wasHealthy := c.healthy if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil { c.healthy = false return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err) } c.healthy = true return !wasHealthy, nil } func (c *treeClient) createClient() (*rpcclient.Client, error) { cli := rpcclient.New(append( rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}), rpcclient.WithDialTimeout(c.nodeDialTimeout), rpcclient.WithRWTimeout(c.streamTimeout), rpcclient.WithGRPCDialOptions(c.opts), )...) return cli, nil } func (c *treeClient) serviceClient() (*rpcclient.Client, error) { c.mu.RLock() defer c.mu.RUnlock() if c.client == nil || !c.healthy { return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address) } return c.client, nil } func (c *treeClient) endpoint() string { return c.address } func (c *treeClient) isHealthy() bool { c.mu.RLock() defer c.mu.RUnlock() return c.healthy } func (c *treeClient) setHealthy(val bool) { c.mu.Lock() defer c.mu.Unlock() c.healthy = val } func (c *treeClient) close() error { c.mu.Lock() defer c.mu.Unlock() if c.client == nil || c.client.Conn() == nil { return nil } return c.client.Conn().Close() }