diff --git a/client/client.go b/client/client.go index 014b3d7..b849e22 100644 --- a/client/client.go +++ b/client/client.go @@ -1,15 +1,19 @@ package client import ( + "bytes" "context" "crypto/ecdsa" "crypto/tls" "errors" + "fmt" + "net/url" "time" v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "google.golang.org/grpc" ) @@ -110,6 +114,63 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error { return nil } +func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error { + addresses := node.ExternalAddresses() + + dialAddr := func(addr string) error { + err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions}) + if err != nil { + if !prm.FallbackToAvailableAddress { + return err + } + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + return err + } + } + return nil + } + + if prm.UseRandomAddress { + // Use map for randomized iteration over addresses + addressesMap := make(map[string]struct{}) + for _, addr := range addresses { + addressesMap[addr] = struct{}{} + } + + for addr := range addressesMap { + err := dialAddr(addr) + if err != nil { + return err + } + } + } + + for _, addr := range addresses { + err := dialAddr(addr) + if err != nil { + return err + } + } + return nil +} + +func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error { + u, err := url.Parse(endpoint) + if err != nil { + return err + } + + if u.Scheme == "frostfs" { + nodes := c.prm.NetMap.Nodes() + for _, node := range nodes { + if bytes.Equal([]byte(u.Host), node.PublicKey()) { + return c.netMapDialNode(ctx, &node, prm) + } + } + } + return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint) +} + // sets underlying provider of frostFSAPIServer. The method is used for testing as an approach // to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code. // In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client @@ -141,6 +202,8 @@ type PrmInit struct { ResponseInfoCallback func(ResponseMetaInfo) error + NetMap *netmap.NetMap + NetMagic uint64 } @@ -181,12 +244,7 @@ const ( defaultStreamTimeout = 10 * time.Second ) -// PrmDial groups connection parameters for the Client. -// -// See also Dial. -type PrmDial struct { - Endpoint string - +type PrmDialOptions struct { TLSConfig *tls.Config // If DialTimeout is non-positive, then it's set to defaultDialTimeout. @@ -198,6 +256,24 @@ type PrmDial struct { GRPCDialOptions []grpc.DialOption } +// PrmDial groups connection parameters for the Dial call in the Client. +// +// See also Dial. +type PrmDial struct { + Endpoint string + + PrmDialOptions +} + +// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client. +type PrmNetMapDial struct { + UseExternalAddresses bool + UseRandomAddress bool + FallbackToAvailableAddress bool + + PrmDialOptions +} + // SetServerURI sets server URI in the FrostFS network. // Required parameter. // @@ -222,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) { // // See also SetServerURI. // -// Depreacted: Use PrmDial.TLSConfig instead. -func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) { +// Deprecated: Use PrmDialOptions.TLSConfig instead. +func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) { x.TLSConfig = tlsConfig } // SetTimeout sets the timeout for connection to be established. // MUST BE positive. If not called, 5s timeout will be used by default. // -// Deprecated: Use PrmDial.DialTimeout instead. -func (x *PrmDial) SetTimeout(timeout time.Duration) { +// Deprecated: Use PrmDialOptions.DialTimeout instead. +func (x *PrmDialOptions) SetTimeout(timeout time.Duration) { x.DialTimeout = timeout } // SetStreamTimeout sets the timeout for individual operations in streaming RPC. // MUST BE positive. If not called, 10s timeout will be used by default. // -// Deprecated: Use PrmDial.StreamTimeout instead. -func (x *PrmDial) SetStreamTimeout(timeout time.Duration) { +// Deprecated: Use PrmDialOptions.StreamTimeout instead. +func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) { x.StreamTimeout = timeout } // SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection. // -// Deprecated: Use PrmDial.GRPCDialOptions instead. -func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) { +// Deprecated: Use PrmDialOptions.GRPCDialOptions instead. +func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) { x.GRPCDialOptions = opts } diff --git a/client/client_test.go b/client/client_test.go index d2e5274..e6b7f6e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -5,9 +5,11 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "fmt" "testing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/stretchr/testify/require" ) @@ -65,3 +67,38 @@ func TestClient_DialContext(t *testing.T) { assert(ctx, context.DeadlineExceeded) } + +func TestClient_NetMapDialContext(t *testing.T) { + var prmInit PrmInit + var c Client + publicKey := "foo" + endpoint := "localhost:8080" + + node := netmap.NodeInfo{} + node.SetPublicKey([]byte(publicKey)) + node.SetExternalAddresses(endpoint) + netMap := &netmap.NetMap{} + netMap.SetNodes([]netmap.NodeInfo{node}) + + prmInit.NetMap = netMap + c.Init(prmInit) + + prmNetMapDial := PrmNetMapDial{} + + assert := func(ctx context.Context, errExpected error) { + // expect particular context error according to Dial docs + require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected) + } + + // create pre-abandoned context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + assert(ctx, context.Canceled) + + // create "pre-deadlined" context + ctx, cancel = context.WithTimeout(context.Background(), 0) + defer cancel() + + assert(ctx, context.DeadlineExceeded) +} diff --git a/pool/pool.go b/pool/pool.go index 48f2ee1..1d34095 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error { } prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, + Endpoint: c.prm.address, + PrmDialOptions: sdkClient.PrmDialOptions{ + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + }, } if err = cl.Dial(ctx, prmDial); err != nil { @@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change cl.Init(prmInit) prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, + Endpoint: c.prm.address, + PrmDialOptions: sdkClient.PrmDialOptions{ + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + }, } if err := cl.Dial(ctx, prmDial); err != nil {