131 lines
2.9 KiB
Go
131 lines
2.9 KiB
Go
package tree
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
|
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type treeClient struct {
|
|
mu sync.RWMutex
|
|
address string
|
|
opts []grpc.DialOption
|
|
client *rpcclient.Client
|
|
nodeDialTimeout time.Duration
|
|
streamTimeout time.Duration
|
|
|
|
healthy bool
|
|
}
|
|
|
|
// ErrUnhealthyEndpoint is returned when client in the pool considered unavailable.
|
|
var ErrUnhealthyEndpoint = errors.New("unhealthy endpoint")
|
|
|
|
// newTreeClient creates new tree client with auto dial.
|
|
func newTreeClient(addr string, opts []grpc.DialOption, dialTimeout time.Duration, streamTimeout time.Duration) *treeClient {
|
|
return &treeClient{
|
|
address: addr,
|
|
opts: opts,
|
|
nodeDialTimeout: dialTimeout,
|
|
streamTimeout: streamTimeout,
|
|
}
|
|
}
|
|
|
|
func (c *treeClient) dial(ctx context.Context) error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.client != nil {
|
|
return fmt.Errorf("couldn't dial '%s': connection already established", c.address)
|
|
}
|
|
|
|
var err error
|
|
c.client, err = c.createClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
|
return fmt.Errorf("healthcheck tree service: %w", err)
|
|
}
|
|
|
|
c.healthy = true
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *treeClient) redialIfNecessary(ctx context.Context) (healthHasChanged bool, err error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.client == nil {
|
|
if c.client, err = c.createClient(); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
wasHealthy := c.healthy
|
|
if _, err = rpcapi.Healthcheck(c.client, &tree.HealthcheckRequest{}, rpcclient.WithContext(ctx)); err != nil {
|
|
c.healthy = false
|
|
return wasHealthy, fmt.Errorf("healthcheck tree service: %w", err)
|
|
}
|
|
|
|
c.healthy = true
|
|
|
|
return !wasHealthy, nil
|
|
}
|
|
|
|
func (c *treeClient) createClient() (*rpcclient.Client, error) {
|
|
cli := rpcclient.New(append(
|
|
rpcclient.WithNetworkURIAddress(c.address, &tls.Config{}),
|
|
rpcclient.WithDialTimeout(c.nodeDialTimeout),
|
|
rpcclient.WithRWTimeout(c.streamTimeout),
|
|
rpcclient.WithGRPCDialOptions(c.opts),
|
|
)...)
|
|
|
|
return cli, nil
|
|
}
|
|
|
|
func (c *treeClient) serviceClient() (*rpcclient.Client, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
if c.client == nil || !c.healthy {
|
|
return nil, fmt.Errorf("%w: '%s'", ErrUnhealthyEndpoint, c.address)
|
|
}
|
|
|
|
return c.client, nil
|
|
}
|
|
|
|
func (c *treeClient) endpoint() string {
|
|
return c.address
|
|
}
|
|
|
|
func (c *treeClient) isHealthy() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.healthy
|
|
}
|
|
|
|
func (c *treeClient) setHealthy(val bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.healthy = val
|
|
}
|
|
|
|
func (c *treeClient) close() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.client == nil || c.client.Conn() == nil {
|
|
return nil
|
|
}
|
|
return c.client.Conn().Close()
|
|
}
|