WIP: [#131] add netmap-aware dialer #196

Closed
olefirenque wants to merge 5 commits from olefirenque/frostfs-sdk-go:173-add_netmap_aware_dialer into master
3 changed files with 139 additions and 22 deletions

View file

@ -1,15 +1,19 @@
package client package client
import ( import (
"bytes"
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
Review

stdlib imports should form a separate group, could you move netmap below?

stdlib imports should form a separate group, could you move `netmap` below?
Review

stdlib imports should form a separate group, could you move netmap below?

stdlib imports should form a separate group, could you move `netmap` below?
Review

Also, we have netmap in the SDK (this repo), can we use it instead of the api package?

Also, we have netmap in the SDK (this repo), can we use it instead of the api package?
Review

Yes, I used the grpc domain by mistake.
Fixed.

Yes, I used the grpc domain by mistake. Fixed.
"net/url"
"time" "time"
v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -110,6 +114,63 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
return nil return nil
} }
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error {
addresses := node.ExternalAddresses()
dialAddr := func(addr string) error {
err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions})
if err != nil {
if !prm.FallbackToAvailableAddress {
return err
}
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
return err
}
}
return nil
}
if prm.UseRandomAddress {
// Use map for randomized iteration over addresses
addressesMap := make(map[string]struct{})
for _, addr := range addresses {
addressesMap[addr] = struct{}{}
}
for addr := range addressesMap {
err := dialAddr(addr)
if err != nil {
return err
}
}
}
for _, addr := range addresses {
err := dialAddr(addr)
if err != nil {
return err
}
}
return nil
}
func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error {
u, err := url.Parse(endpoint)
if err != nil {
return err
}
if u.Scheme == "frostfs" {
nodes := c.prm.NetMap.Nodes()
for _, node := range nodes {
if bytes.Equal([]byte(u.Host), node.PublicKey()) {
return c.netMapDialNode(ctx, &node, prm)
}
}
}
return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint)
}
// sets underlying provider of frostFSAPIServer. The method is used for testing as an approach // sets underlying provider of frostFSAPIServer. The method is used for testing as an approach
// to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code. // to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code.
// In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client // In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client
@ -141,6 +202,8 @@ type PrmInit struct {
ResponseInfoCallback func(ResponseMetaInfo) error ResponseInfoCallback func(ResponseMetaInfo) error
NetMap *netmap.NetMap
NetMagic uint64 NetMagic uint64
} }
@ -181,12 +244,7 @@ const (
defaultStreamTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second
) )
// PrmDial groups connection parameters for the Client. type PrmDialOptions struct {
//
// See also Dial.
type PrmDial struct {
Endpoint string
TLSConfig *tls.Config TLSConfig *tls.Config
// If DialTimeout is non-positive, then it's set to defaultDialTimeout. // If DialTimeout is non-positive, then it's set to defaultDialTimeout.
@ -198,6 +256,24 @@ type PrmDial struct {
GRPCDialOptions []grpc.DialOption GRPCDialOptions []grpc.DialOption
} }
// PrmDial groups connection parameters for the Dial call in the Client.
//
// See also Dial.
type PrmDial struct {
Endpoint string
PrmDialOptions
}
// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client.
type PrmNetMapDial struct {
UseExternalAddresses bool
UseRandomAddress bool
FallbackToAvailableAddress bool
PrmDialOptions
}
// SetServerURI sets server URI in the FrostFS network. // SetServerURI sets server URI in the FrostFS network.
// Required parameter. // Required parameter.
// //
@ -222,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) {
// //
// See also SetServerURI. // See also SetServerURI.
// //
// Depreacted: Use PrmDial.TLSConfig instead. // Deprecated: Use PrmDialOptions.TLSConfig instead.
func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) { func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) {
x.TLSConfig = tlsConfig x.TLSConfig = tlsConfig
} }
// SetTimeout sets the timeout for connection to be established. // SetTimeout sets the timeout for connection to be established.
// MUST BE positive. If not called, 5s timeout will be used by default. // MUST BE positive. If not called, 5s timeout will be used by default.
// //
// Deprecated: Use PrmDial.DialTimeout instead. // Deprecated: Use PrmDialOptions.DialTimeout instead.
func (x *PrmDial) SetTimeout(timeout time.Duration) { func (x *PrmDialOptions) SetTimeout(timeout time.Duration) {
x.DialTimeout = timeout x.DialTimeout = timeout
} }
// SetStreamTimeout sets the timeout for individual operations in streaming RPC. // SetStreamTimeout sets the timeout for individual operations in streaming RPC.
// MUST BE positive. If not called, 10s timeout will be used by default. // MUST BE positive. If not called, 10s timeout will be used by default.
// //
// Deprecated: Use PrmDial.StreamTimeout instead. // Deprecated: Use PrmDialOptions.StreamTimeout instead.
func (x *PrmDial) SetStreamTimeout(timeout time.Duration) { func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) {
x.StreamTimeout = timeout x.StreamTimeout = timeout
} }
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection. // SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
// //
// Deprecated: Use PrmDial.GRPCDialOptions instead. // Deprecated: Use PrmDialOptions.GRPCDialOptions instead.
func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) { func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) {
x.GRPCDialOptions = opts x.GRPCDialOptions = opts
} }

View file

@ -5,9 +5,11 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/elliptic" "crypto/elliptic"
"crypto/rand" "crypto/rand"
"fmt"
"testing" "testing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -65,3 +67,38 @@ func TestClient_DialContext(t *testing.T) {
assert(ctx, context.DeadlineExceeded) assert(ctx, context.DeadlineExceeded)
} }
func TestClient_NetMapDialContext(t *testing.T) {
var prmInit PrmInit
var c Client
publicKey := "foo"
endpoint := "localhost:8080"
node := netmap.NodeInfo{}
node.SetPublicKey([]byte(publicKey))
node.SetExternalAddresses(endpoint)
netMap := &netmap.NetMap{}
netMap.SetNodes([]netmap.NodeInfo{node})
prmInit.NetMap = netMap
c.Init(prmInit)
prmNetMapDial := PrmNetMapDial{}
assert := func(ctx context.Context, errExpected error) {
// expect particular context error according to Dial docs
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected)
}
// create pre-abandoned context
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert(ctx, context.Canceled)
// create "pre-deadlined" context
ctx, cancel = context.WithTimeout(context.Background(), 0)
defer cancel()
assert(ctx, context.DeadlineExceeded)
}

View file

@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error {
} }
prmDial := sdkClient.PrmDial{ prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address, Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout, PrmDialOptions: sdkClient.PrmDialOptions{
StreamTimeout: c.prm.streamTimeout, DialTimeout: c.prm.dialTimeout,
GRPCDialOptions: c.prm.dialOptions, StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
} }
if err = cl.Dial(ctx, prmDial); err != nil { if err = cl.Dial(ctx, prmDial); err != nil {
@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
cl.Init(prmInit) cl.Init(prmInit)
prmDial := sdkClient.PrmDial{ prmDial := sdkClient.PrmDial{
Endpoint: c.prm.address, Endpoint: c.prm.address,
DialTimeout: c.prm.dialTimeout, PrmDialOptions: sdkClient.PrmDialOptions{
StreamTimeout: c.prm.streamTimeout, DialTimeout: c.prm.dialTimeout,
GRPCDialOptions: c.prm.dialOptions, StreamTimeout: c.prm.streamTimeout,
GRPCDialOptions: c.prm.dialOptions,
},
} }
if err := cl.Dial(ctx, prmDial); err != nil { if err := cl.Dial(ctx, prmDial); err != nil {