From 6ace44f6b5b747289479ee0df31e0129858a1ded Mon Sep 17 00:00:00 2001 From: olefirenque Date: Thu, 30 Nov 2023 13:12:15 +0300 Subject: [PATCH 1/5] [#131] client: add basic netmap dialer Signed-off-by: olefirenque --- client/client.go | 22 ++++++++++++++++++++++ client/client_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/client/client.go b/client/client.go index 014b3d7..d4a3e3b 100644 --- a/client/client.go +++ b/client/client.go @@ -5,6 +5,10 @@ import ( "crypto/ecdsa" "crypto/tls" "errors" + "fmt" + netmap "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" + "net/url" + "slices" "time" v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" @@ -110,6 +114,22 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error { return nil } +func (c *Client) NetMapDial(ctx context.Context, endpoint string) error { + u, err := url.Parse(endpoint) + if err != nil { + return err + } + if u.Scheme == "frostfs" { + nodes := c.prm.NetMap.GetNodes() + for _, node := range nodes { + if slices.Equal([]byte(u.Host), node.PublicKey) { + return c.Dial(ctx, PrmDial{Endpoint: node.Addresses[0]}) + } + } + } + return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint) +} + // sets underlying provider of frostFSAPIServer. The method is used for testing as an approach // to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code. // In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client @@ -141,6 +161,8 @@ type PrmInit struct { ResponseInfoCallback func(ResponseMetaInfo) error + NetMap *netmap.Netmap + NetMagic uint64 } diff --git a/client/client_test.go b/client/client_test.go index d2e5274..e09fc11 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -5,6 +5,8 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "fmt" + netmap "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" "testing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -65,3 +67,40 @@ func TestClient_DialContext(t *testing.T) { assert(ctx, context.DeadlineExceeded) } + +func TestClient_NetMapDialContext(t *testing.T) { + var prmInit PrmInit + var c Client + publicKey := "foo" + endpoint := "localhost:8080" + + prmInit.NetMap = &netmap.Netmap{ + Epoch: 0, + Nodes: []*netmap.NodeInfo{ + { + PublicKey: []byte(publicKey), + Addresses: []string{endpoint}, + }, + }, + } + + c.Init(prmInit) + + assert := func(ctx context.Context, errExpected error) { + // expect particular context error according to Dial docs + //require.ErrorIs(t, c.Dial(ctx, prm), errExpected) + require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey)), errExpected) + } + + // create pre-abandoned context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + assert(ctx, context.Canceled) + + // create "pre-deadlined" context + ctx, cancel = context.WithTimeout(context.Background(), 0) + defer cancel() + + assert(ctx, context.DeadlineExceeded) +} -- 2.45.2 From dd8fb59efe31f94aa830ea254434ad47da805933 Mon Sep 17 00:00:00 2001 From: olefirenque Date: Thu, 30 Nov 2023 13:26:21 +0300 Subject: [PATCH 2/5] [#131] client: get rid of slices.Equal Signed-off-by: olefirenque --- client/client.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index d4a3e3b..5148838 100644 --- a/client/client.go +++ b/client/client.go @@ -8,7 +8,6 @@ import ( "fmt" netmap "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" "net/url" - "slices" "time" v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" @@ -119,10 +118,23 @@ func (c *Client) NetMapDial(ctx context.Context, endpoint string) error { if err != nil { return err } + + isEqualSlices := func(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true + } + if u.Scheme == "frostfs" { nodes := c.prm.NetMap.GetNodes() for _, node := range nodes { - if slices.Equal([]byte(u.Host), node.PublicKey) { + if isEqualSlices([]byte(u.Host), node.PublicKey) { return c.Dial(ctx, PrmDial{Endpoint: node.Addresses[0]}) } } -- 2.45.2 From cf4677ecb914bf0ab8ae184866c48843ba36265a Mon Sep 17 00:00:00 2001 From: olefirenque Date: Thu, 30 Nov 2023 14:28:44 +0300 Subject: [PATCH 3/5] [#131] client: refactor netmap dialer Signed-off-by: olefirenque --- client/client.go | 29 ++++++++++++----------------- client/client_test.go | 18 ++++++++---------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/client/client.go b/client/client.go index 5148838..ddc9b33 100644 --- a/client/client.go +++ b/client/client.go @@ -1,18 +1,19 @@ package client import ( + "bytes" "context" "crypto/ecdsa" "crypto/tls" "errors" "fmt" - netmap "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" "net/url" "time" v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "google.golang.org/grpc" ) @@ -113,29 +114,23 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error { return nil } +func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo) error { + // TODO: implement dialing options + addresses := node.ExternalAddresses() + return c.Dial(ctx, PrmDial{Endpoint: addresses[0]}) +} + func (c *Client) NetMapDial(ctx context.Context, endpoint string) error { u, err := url.Parse(endpoint) if err != nil { return err } - isEqualSlices := func(a, b []byte) bool { - if len(a) != len(b) { - return false - } - for i := range a { - if a[i] != b[i] { - return false - } - } - return true - } - if u.Scheme == "frostfs" { - nodes := c.prm.NetMap.GetNodes() + nodes := c.prm.NetMap.Nodes() for _, node := range nodes { - if isEqualSlices([]byte(u.Host), node.PublicKey) { - return c.Dial(ctx, PrmDial{Endpoint: node.Addresses[0]}) + if bytes.Equal([]byte(u.Host), node.PublicKey()) { + return c.netMapDialNode(ctx, &node) } } } @@ -173,7 +168,7 @@ type PrmInit struct { ResponseInfoCallback func(ResponseMetaInfo) error - NetMap *netmap.Netmap + NetMap *netmap.NetMap NetMagic uint64 } diff --git a/client/client_test.go b/client/client_test.go index e09fc11..f3c97c9 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -6,10 +6,10 @@ import ( "crypto/elliptic" "crypto/rand" "fmt" - netmap "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" "testing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "github.com/stretchr/testify/require" ) @@ -74,15 +74,13 @@ func TestClient_NetMapDialContext(t *testing.T) { publicKey := "foo" endpoint := "localhost:8080" - prmInit.NetMap = &netmap.Netmap{ - Epoch: 0, - Nodes: []*netmap.NodeInfo{ - { - PublicKey: []byte(publicKey), - Addresses: []string{endpoint}, - }, - }, - } + node := netmap.NodeInfo{} + node.SetPublicKey([]byte(publicKey)) + node.SetExternalAddresses(endpoint) + netMap := &netmap.NetMap{} + netMap.SetNodes([]netmap.NodeInfo{node}) + + prmInit.NetMap = netMap c.Init(prmInit) -- 2.45.2 From 7b03c3a6a16a88162b39203d3aa44aba4a9398df Mon Sep 17 00:00:00 2001 From: olefirenque Date: Thu, 7 Dec 2023 11:40:03 +0300 Subject: [PATCH 4/5] [#131] client: implement dialing config Signed-off-by: olefirenque --- client/client.go | 85 +++++++++++++++++++++++++++++++++---------- client/client_test.go | 6 +-- pool/pool.go | 20 ++++++---- 3 files changed, 81 insertions(+), 30 deletions(-) diff --git a/client/client.go b/client/client.go index ddc9b33..05d2baf 100644 --- a/client/client.go +++ b/client/client.go @@ -114,13 +114,47 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error { return nil } -func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo) error { - // TODO: implement dialing options +func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error { addresses := node.ExternalAddresses() - return c.Dial(ctx, PrmDial{Endpoint: addresses[0]}) + + dialAddr := func(addr string) error { + err := c.Dial(ctx, PrmDial{Endpoint: addr}) + if err != nil { + if !prm.FallbackToAvailableAddress { + return err + } + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + return err + } + } + return nil + } + + if prm.UseRandomAddress { + // Use map for randomized iteration over addresses + addressesMap := make(map[string]struct{}) + for _, addr := range addresses { + addressesMap[addr] = struct{}{} + } + + for addr := range addressesMap { + err := dialAddr(addr) + if err != nil { + return err + } + } + } + + for _, addr := range addresses { + err := dialAddr(addr) + if err != nil { + return err + } + } + return nil } -func (c *Client) NetMapDial(ctx context.Context, endpoint string) error { +func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error { u, err := url.Parse(endpoint) if err != nil { return err @@ -130,7 +164,7 @@ func (c *Client) NetMapDial(ctx context.Context, endpoint string) error { nodes := c.prm.NetMap.Nodes() for _, node := range nodes { if bytes.Equal([]byte(u.Host), node.PublicKey()) { - return c.netMapDialNode(ctx, &node) + return c.netMapDialNode(ctx, &node, prm) } } } @@ -210,12 +244,7 @@ const ( defaultStreamTimeout = 10 * time.Second ) -// PrmDial groups connection parameters for the Client. -// -// See also Dial. -type PrmDial struct { - Endpoint string - +type PrmDialOptions struct { TLSConfig *tls.Config // If DialTimeout is non-positive, then it's set to defaultDialTimeout. @@ -227,6 +256,24 @@ type PrmDial struct { GRPCDialOptions []grpc.DialOption } +// PrmDial groups connection parameters for the Dial call in the Client. +// +// See also Dial. +type PrmDial struct { + Endpoint string + + PrmDialOptions +} + +// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client. +type PrmNetMapDial struct { + UseExternalAddresses bool + UseRandomAddress bool + FallbackToAvailableAddress bool + + PrmDialOptions +} + // SetServerURI sets server URI in the FrostFS network. // Required parameter. // @@ -251,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) { // // See also SetServerURI. // -// Depreacted: Use PrmDial.TLSConfig instead. -func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) { +// Deprecated: Use PrmDialOptions.TLSConfig instead. +func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) { x.TLSConfig = tlsConfig } // SetTimeout sets the timeout for connection to be established. // MUST BE positive. If not called, 5s timeout will be used by default. // -// Deprecated: Use PrmDial.DialTimeout instead. -func (x *PrmDial) SetTimeout(timeout time.Duration) { +// Deprecated: Use PrmDialOptions.DialTimeout instead. +func (x *PrmDialOptions) SetTimeout(timeout time.Duration) { x.DialTimeout = timeout } // SetStreamTimeout sets the timeout for individual operations in streaming RPC. // MUST BE positive. If not called, 10s timeout will be used by default. // -// Deprecated: Use PrmDial.StreamTimeout instead. -func (x *PrmDial) SetStreamTimeout(timeout time.Duration) { +// Deprecated: Use PrmDialOptions.StreamTimeout instead. +func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) { x.StreamTimeout = timeout } // SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection. // -// Deprecated: Use PrmDial.GRPCDialOptions instead. -func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) { +// Deprecated: Use PrmDialOptions.GRPCDialOptions instead. +func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) { x.GRPCDialOptions = opts } diff --git a/client/client_test.go b/client/client_test.go index f3c97c9..e6b7f6e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -81,13 +81,13 @@ func TestClient_NetMapDialContext(t *testing.T) { netMap.SetNodes([]netmap.NodeInfo{node}) prmInit.NetMap = netMap - c.Init(prmInit) + prmNetMapDial := PrmNetMapDial{} + assert := func(ctx context.Context, errExpected error) { // expect particular context error according to Dial docs - //require.ErrorIs(t, c.Dial(ctx, prm), errExpected) - require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey)), errExpected) + require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected) } // create pre-abandoned context diff --git a/pool/pool.go b/pool/pool.go index 48f2ee1..1d34095 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error { } prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, + Endpoint: c.prm.address, + PrmDialOptions: sdkClient.PrmDialOptions{ + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + }, } if err = cl.Dial(ctx, prmDial); err != nil { @@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change cl.Init(prmInit) prmDial := sdkClient.PrmDial{ - Endpoint: c.prm.address, - DialTimeout: c.prm.dialTimeout, - StreamTimeout: c.prm.streamTimeout, - GRPCDialOptions: c.prm.dialOptions, + Endpoint: c.prm.address, + PrmDialOptions: sdkClient.PrmDialOptions{ + DialTimeout: c.prm.dialTimeout, + StreamTimeout: c.prm.streamTimeout, + GRPCDialOptions: c.prm.dialOptions, + }, } if err := cl.Dial(ctx, prmDial); err != nil { -- 2.45.2 From aa536d4a4de0477034d8a7c76e96f975fa4eeaba Mon Sep 17 00:00:00 2001 From: olefirenque Date: Thu, 7 Dec 2023 12:14:13 +0300 Subject: [PATCH 5/5] [#131] client: pass PrmDialOptions to dial call Signed-off-by: olefirenque --- client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 05d2baf..b849e22 100644 --- a/client/client.go +++ b/client/client.go @@ -118,7 +118,7 @@ func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm addresses := node.ExternalAddresses() dialAddr := func(addr string) error { - err := c.Dial(ctx, PrmDial{Endpoint: addr}) + err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions}) if err != nil { if !prm.FallbackToAvailableAddress { return err -- 2.45.2