WIP: [#131] add netmap-aware dialer #196
3 changed files with 139 additions and 22 deletions
104
client/client.go
104
client/client.go
|
@ -1,15 +1,19 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -110,6 +114,63 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error {
|
||||
addresses := node.ExternalAddresses()
|
||||
|
||||
dialAddr := func(addr string) error {
|
||||
err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions})
|
||||
if err != nil {
|
||||
if !prm.FallbackToAvailableAddress {
|
||||
return err
|
||||
}
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if prm.UseRandomAddress {
|
||||
// Use map for randomized iteration over addresses
|
||||
addressesMap := make(map[string]struct{})
|
||||
for _, addr := range addresses {
|
||||
addressesMap[addr] = struct{}{}
|
||||
}
|
||||
|
||||
for addr := range addressesMap {
|
||||
err := dialAddr(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, addr := range addresses {
|
||||
err := dialAddr(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error {
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if u.Scheme == "frostfs" {
|
||||
nodes := c.prm.NetMap.Nodes()
|
||||
for _, node := range nodes {
|
||||
if bytes.Equal([]byte(u.Host), node.PublicKey()) {
|
||||
return c.netMapDialNode(ctx, &node, prm)
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint)
|
||||
}
|
||||
|
||||
// sets underlying provider of frostFSAPIServer. The method is used for testing as an approach
|
||||
// to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code.
|
||||
// In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client
|
||||
|
@ -141,6 +202,8 @@ type PrmInit struct {
|
|||
|
||||
ResponseInfoCallback func(ResponseMetaInfo) error
|
||||
|
||||
NetMap *netmap.NetMap
|
||||
|
||||
NetMagic uint64
|
||||
}
|
||||
|
||||
|
@ -181,12 +244,7 @@ const (
|
|||
defaultStreamTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// PrmDial groups connection parameters for the Client.
|
||||
//
|
||||
// See also Dial.
|
||||
type PrmDial struct {
|
||||
Endpoint string
|
||||
|
||||
type PrmDialOptions struct {
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// If DialTimeout is non-positive, then it's set to defaultDialTimeout.
|
||||
|
@ -198,6 +256,24 @@ type PrmDial struct {
|
|||
GRPCDialOptions []grpc.DialOption
|
||||
}
|
||||
|
||||
// PrmDial groups connection parameters for the Dial call in the Client.
|
||||
//
|
||||
// See also Dial.
|
||||
type PrmDial struct {
|
||||
Endpoint string
|
||||
|
||||
PrmDialOptions
|
||||
}
|
||||
|
||||
// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client.
|
||||
type PrmNetMapDial struct {
|
||||
UseExternalAddresses bool
|
||||
UseRandomAddress bool
|
||||
FallbackToAvailableAddress bool
|
||||
|
||||
PrmDialOptions
|
||||
}
|
||||
|
||||
// SetServerURI sets server URI in the FrostFS network.
|
||||
// Required parameter.
|
||||
//
|
||||
|
@ -222,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) {
|
|||
//
|
||||
// See also SetServerURI.
|
||||
//
|
||||
// Depreacted: Use PrmDial.TLSConfig instead.
|
||||
func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) {
|
||||
// Deprecated: Use PrmDialOptions.TLSConfig instead.
|
||||
func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) {
|
||||
x.TLSConfig = tlsConfig
|
||||
}
|
||||
|
||||
// SetTimeout sets the timeout for connection to be established.
|
||||
// MUST BE positive. If not called, 5s timeout will be used by default.
|
||||
//
|
||||
// Deprecated: Use PrmDial.DialTimeout instead.
|
||||
func (x *PrmDial) SetTimeout(timeout time.Duration) {
|
||||
// Deprecated: Use PrmDialOptions.DialTimeout instead.
|
||||
func (x *PrmDialOptions) SetTimeout(timeout time.Duration) {
|
||||
x.DialTimeout = timeout
|
||||
}
|
||||
|
||||
// SetStreamTimeout sets the timeout for individual operations in streaming RPC.
|
||||
// MUST BE positive. If not called, 10s timeout will be used by default.
|
||||
//
|
||||
// Deprecated: Use PrmDial.StreamTimeout instead.
|
||||
func (x *PrmDial) SetStreamTimeout(timeout time.Duration) {
|
||||
// Deprecated: Use PrmDialOptions.StreamTimeout instead.
|
||||
func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) {
|
||||
x.StreamTimeout = timeout
|
||||
}
|
||||
|
||||
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
|
||||
//
|
||||
// Deprecated: Use PrmDial.GRPCDialOptions instead.
|
||||
func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||
// Deprecated: Use PrmDialOptions.GRPCDialOptions instead.
|
||||
func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||
x.GRPCDialOptions = opts
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -65,3 +67,38 @@ func TestClient_DialContext(t *testing.T) {
|
|||
|
||||
assert(ctx, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func TestClient_NetMapDialContext(t *testing.T) {
|
||||
var prmInit PrmInit
|
||||
var c Client
|
||||
publicKey := "foo"
|
||||
endpoint := "localhost:8080"
|
||||
|
||||
node := netmap.NodeInfo{}
|
||||
node.SetPublicKey([]byte(publicKey))
|
||||
node.SetExternalAddresses(endpoint)
|
||||
netMap := &netmap.NetMap{}
|
||||
netMap.SetNodes([]netmap.NodeInfo{node})
|
||||
|
||||
prmInit.NetMap = netMap
|
||||
c.Init(prmInit)
|
||||
|
||||
prmNetMapDial := PrmNetMapDial{}
|
||||
|
||||
assert := func(ctx context.Context, errExpected error) {
|
||||
// expect particular context error according to Dial docs
|
||||
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected)
|
||||
}
|
||||
|
||||
// create pre-abandoned context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
assert(ctx, context.Canceled)
|
||||
|
||||
// create "pre-deadlined" context
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 0)
|
||||
defer cancel()
|
||||
|
||||
assert(ctx, context.DeadlineExceeded)
|
||||
}
|
||||
|
|
20
pool/pool.go
20
pool/pool.go
|
@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
|||
}
|
||||
|
||||
prmDial := sdkClient.PrmDial{
|
||||
Endpoint: c.prm.address,
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
Endpoint: c.prm.address,
|
||||
PrmDialOptions: sdkClient.PrmDialOptions{
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
},
|
||||
}
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
|
@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
cl.Init(prmInit)
|
||||
|
||||
prmDial := sdkClient.PrmDial{
|
||||
Endpoint: c.prm.address,
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
Endpoint: c.prm.address,
|
||||
PrmDialOptions: sdkClient.PrmDialOptions{
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
},
|
||||
}
|
||||
|
||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
||||
|
|
Loading…
Reference in a new issue
stdlib imports should form a separate group, could you move
netmap
below?stdlib imports should form a separate group, could you move
netmap
below?Also, we have netmap in the SDK (this repo), can we use it instead of the api package?
Yes, I used the grpc domain by mistake.
Fixed.