[#131] client: implement dialing config

Signed-off-by: olefirenque <egor.olefirenko892@gmail.com>
This commit is contained in:
Egor Olefirenko 2023-12-07 11:40:03 +03:00
parent cf4677ecb9
commit 7b03c3a6a1
3 changed files with 81 additions and 30 deletions

View file

@ -114,13 +114,47 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
return nil
}
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo) error {
// TODO: implement dialing options
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error {
addresses := node.ExternalAddresses()
return c.Dial(ctx, PrmDial{Endpoint: addresses[0]})
dialAddr := func(addr string) error {
err := c.Dial(ctx, PrmDial{Endpoint: addr})
if err != nil {
if !prm.FallbackToAvailableAddress {
return err
}
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
return err
}
}
return nil
}
if prm.UseRandomAddress {
// Use map for randomized iteration over addresses
addressesMap := make(map[string]struct{})
for _, addr := range addresses {
addressesMap[addr] = struct{}{}
}
for addr := range addressesMap {
err := dialAddr(addr)
if err != nil {
return err
}
}
}
for _, addr := range addresses {
err := dialAddr(addr)
if err != nil {
return err
}
}
return nil
}
func (c *Client) NetMapDial(ctx context.Context, endpoint string) error {
func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error {
u, err := url.Parse(endpoint)
if err != nil {
return err
@ -130,7 +164,7 @@ func (c *Client) NetMapDial(ctx context.Context, endpoint string) error {
nodes := c.prm.NetMap.Nodes()
for _, node := range nodes {
if bytes.Equal([]byte(u.Host), node.PublicKey()) {
return c.netMapDialNode(ctx, &node)
return c.netMapDialNode(ctx, &node, prm)
}
}
}
@ -210,12 +244,7 @@ const (
defaultStreamTimeout = 10 * time.Second
)
// PrmDial groups connection parameters for the Client.
//
// See also Dial.
type PrmDial struct {
Endpoint string
type PrmDialOptions struct {
TLSConfig *tls.Config
// If DialTimeout is non-positive, then it's set to defaultDialTimeout.
@ -227,6 +256,24 @@ type PrmDial struct {
GRPCDialOptions []grpc.DialOption
}
// PrmDial groups connection parameters for the Dial call in the Client.
//
// See also Dial.
type PrmDial struct {
Endpoint string
PrmDialOptions
}
// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client.
type PrmNetMapDial struct {
UseExternalAddresses bool
UseRandomAddress bool
FallbackToAvailableAddress bool
PrmDialOptions
}
// SetServerURI sets server URI in the FrostFS network.
// Required parameter.
//
@ -251,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) {
//
// See also SetServerURI.
//
// Depreacted: Use PrmDial.TLSConfig instead.
func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) {
// Deprecated: Use PrmDialOptions.TLSConfig instead.
func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) {
x.TLSConfig = tlsConfig
}
// SetTimeout sets the timeout for connection to be established.
// MUST BE positive. If not called, 5s timeout will be used by default.
//
// Deprecated: Use PrmDial.DialTimeout instead.
func (x *PrmDial) SetTimeout(timeout time.Duration) {
// Deprecated: Use PrmDialOptions.DialTimeout instead.
func (x *PrmDialOptions) SetTimeout(timeout time.Duration) {
x.DialTimeout = timeout
}
// SetStreamTimeout sets the timeout for individual operations in streaming RPC.
// MUST BE positive. If not called, 10s timeout will be used by default.
//
// Deprecated: Use PrmDial.StreamTimeout instead.
func (x *PrmDial) SetStreamTimeout(timeout time.Duration) {
// Deprecated: Use PrmDialOptions.StreamTimeout instead.
func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) {
x.StreamTimeout = timeout
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
//
// Deprecated: Use PrmDial.GRPCDialOptions instead.
func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) {
// Deprecated: Use PrmDialOptions.GRPCDialOptions instead.
func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.GRPCDialOptions = opts
}

View file

@ -81,13 +81,13 @@ func TestClient_NetMapDialContext(t *testing.T) {
netMap.SetNodes([]netmap.NodeInfo{node})
prmInit.NetMap = netMap
c.Init(prmInit)
prmNetMapDial := PrmNetMapDial{}
assert := func(ctx context.Context, errExpected error) {
// expect particular context error according to Dial docs
//require.ErrorIs(t, c.Dial(ctx, prm), errExpected)
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey)), errExpected)
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected)
}
// create pre-abandoned context

View file

@ -343,9 +343,11 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
PrmDialOptions: sdkClient.PrmDialOptions{
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
}
if err = cl.Dial(ctx, prmDial); err != nil {
@ -382,9 +384,11 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
PrmDialOptions: sdkClient.PrmDialOptions{
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
}
if err := cl.Dial(ctx, prmDial); err != nil {