WIP: [#131] add netmap-aware dialer #196

Closed
olefirenque wants to merge 5 commits from olefirenque/frostfs-sdk-go:173-add_netmap_aware_dialer into master
3 changed files with 139 additions and 22 deletions

View file

@ -1,15 +1,19 @@
package client
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/tls"
"errors"
"fmt"
Review

stdlib imports should form a separate group, could you move netmap below?

stdlib imports should form a separate group, could you move `netmap` below?
Review

stdlib imports should form a separate group, could you move netmap below?

stdlib imports should form a separate group, could you move `netmap` below?
Review

Also, we have netmap in the SDK (this repo), can we use it instead of the api package?

Also, we have netmap in the SDK (this repo), can we use it instead of the api package?
Review

Yes, I used the grpc domain by mistake.
Fixed.

Yes, I used the grpc domain by mistake. Fixed.
"net/url"
"time"
v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"google.golang.org/grpc"
)
@ -110,6 +114,63 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
return nil
}
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error {
addresses := node.ExternalAddresses()
dialAddr := func(addr string) error {
err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions})
if err != nil {
if !prm.FallbackToAvailableAddress {
return err
}
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
return err
}
}
return nil
}
if prm.UseRandomAddress {
// Use map for randomized iteration over addresses
addressesMap := make(map[string]struct{})
for _, addr := range addresses {
addressesMap[addr] = struct{}{}
}
for addr := range addressesMap {
err := dialAddr(addr)
if err != nil {
return err
}
}
}
for _, addr := range addresses {
err := dialAddr(addr)
if err != nil {
return err
}
}
return nil
}
func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
if u.Scheme == "frostfs" {
nodes := c.prm.NetMap.Nodes()
for _, node := range nodes {
if bytes.Equal([]byte(u.Host), node.PublicKey()) {
return c.netMapDialNode(ctx, &node, prm)
}
}
}
return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint)
}
// sets underlying provider of frostFSAPIServer. The method is used for testing as an approach
// to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code.
// In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client
@ -141,6 +202,8 @@ type PrmInit struct {
ResponseInfoCallback func(ResponseMetaInfo) error
NetMap *netmap.NetMap
NetMagic uint64
}
@ -181,12 +244,7 @@ const (
defaultStreamTimeout = 10 * time.Second
)
// PrmDial groups connection parameters for the Client.
//
// See also Dial.
type PrmDial struct {
Endpoint string
type PrmDialOptions struct {
TLSConfig *tls.Config
// If DialTimeout is non-positive, then it's set to defaultDialTimeout.
@ -198,6 +256,24 @@ type PrmDial struct {
GRPCDialOptions []grpc.DialOption
}
// PrmDial groups connection parameters for the Dial call in the Client.
//
// See also Dial.
type PrmDial struct {
Endpoint string
PrmDialOptions
}
// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client.
type PrmNetMapDial struct {
UseExternalAddresses bool
UseRandomAddress bool
FallbackToAvailableAddress bool
PrmDialOptions
}
// SetServerURI sets server URI in the FrostFS network.
// Required parameter.
//
@ -222,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) {
//
// See also SetServerURI.
//
// Depreacted: Use PrmDial.TLSConfig instead.
func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) {
// Deprecated: Use PrmDialOptions.TLSConfig instead.
func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) {
x.TLSConfig = tlsConfig
}
// SetTimeout sets the timeout for connection to be established.
// MUST BE positive. If not called, 5s timeout will be used by default.
//
// Deprecated: Use PrmDial.DialTimeout instead.
func (x *PrmDial) SetTimeout(timeout time.Duration) {
// Deprecated: Use PrmDialOptions.DialTimeout instead.
func (x *PrmDialOptions) SetTimeout(timeout time.Duration) {
x.DialTimeout = timeout
}
// SetStreamTimeout sets the timeout for individual operations in streaming RPC.
// MUST BE positive. If not called, 10s timeout will be used by default.
//
// Deprecated: Use PrmDial.StreamTimeout instead.
func (x *PrmDial) SetStreamTimeout(timeout time.Duration) {
// Deprecated: Use PrmDialOptions.StreamTimeout instead.
func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) {
x.StreamTimeout = timeout
}
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
//
// Deprecated: Use PrmDial.GRPCDialOptions instead.
func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) {
// Deprecated: Use PrmDialOptions.GRPCDialOptions instead.
func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.GRPCDialOptions = opts
}

View file

@ -5,9 +5,11 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"fmt"
"testing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/stretchr/testify/require"
)
@ -65,3 +67,38 @@ func TestClient_DialContext(t *testing.T) {
assert(ctx, context.DeadlineExceeded)
}
func TestClient_NetMapDialContext(t *testing.T) {
var prmInit PrmInit
var c Client
publicKey := "foo"
endpoint := "localhost:8080"
node := netmap.NodeInfo{}
node.SetPublicKey([]byte(publicKey))
node.SetExternalAddresses(endpoint)
netMap := &netmap.NetMap{}
netMap.SetNodes([]netmap.NodeInfo{node})
prmInit.NetMap = netMap
c.Init(prmInit)
prmNetMapDial := PrmNetMapDial{}
assert := func(ctx context.Context, errExpected error) {
// expect particular context error according to Dial docs
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected)
}
// create pre-abandoned context
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert(ctx, context.Canceled)
// create "pre-deadlined" context
ctx, cancel = context.WithTimeout(context.Background(), 0)
defer cancel()
assert(ctx, context.DeadlineExceeded)
}

View file

@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error {
}
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
Endpoint: c.prm.address,
PrmDialOptions: sdkClient.PrmDialOptions{
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
}
if err = cl.Dial(ctx, prmDial); err != nil {
@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
cl.Init(prmInit)
prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
Endpoint: c.prm.address,
PrmDialOptions: sdkClient.PrmDialOptions{
DialTimeout: c.prm.dialTimeout,
StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
}
if err := cl.Dial(ctx, prmDial); err != nil {