WIP: [#131] add netmap-aware dialer #196
3 changed files with 139 additions and 22 deletions
104
client/client.go
104
client/client.go
|
@ -1,15 +1,19 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
v2accounting "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
@ -110,6 +114,63 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) netMapDialNode(ctx context.Context, node *netmap.NodeInfo, prm PrmNetMapDial) error {
|
||||
addresses := node.ExternalAddresses()
|
||||
|
||||
dialAddr := func(addr string) error {
|
||||
err := c.Dial(ctx, PrmDial{Endpoint: addr, PrmDialOptions: prm.PrmDialOptions})
|
||||
if err != nil {
|
||||
fyrchik
commented
Use Use `bytes.Equal`?
|
||||
if !prm.FallbackToAvailableAddress {
|
||||
return err
|
||||
}
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if prm.UseRandomAddress {
|
||||
// Use map for randomized iteration over addresses
|
||||
addressesMap := make(map[string]struct{})
|
||||
for _, addr := range addresses {
|
||||
addressesMap[addr] = struct{}{}
|
||||
}
|
||||
fyrchik
commented
This is where we can implement any logic we need. I suggest moving this
It would be easier to do this in a separate function, in this one we have a simple This is where we can implement any logic we need. I suggest moving this `dial` in a separate function, we might want:
1. Choose addresses from `ExternalAddresses` attribute
2. Chose _random_ address.
3. Handle dial errors (if one address is unavailable, try the next one).
It would be easier to do this in a separate function, in this one we have a simple `dial`.
olefirenque
commented
Should each option be explicitly defined by client user? Should each option be explicitly defined by client user?
fyrchik
commented
We can discuss. IMO this could be governed with a single parameter, with some reasonable default. We can discuss. IMO this could be governed with a single parameter, with some reasonable default.
|
||||
|
||||
for addr := range addressesMap {
|
||||
err := dialAddr(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, addr := range addresses {
|
||||
err := dialAddr(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) NetMapDial(ctx context.Context, endpoint string, prm PrmNetMapDial) error {
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if u.Scheme == "frostfs" {
|
||||
nodes := c.prm.NetMap.Nodes()
|
||||
for _, node := range nodes {
|
||||
if bytes.Equal([]byte(u.Host), node.PublicKey()) {
|
||||
return c.netMapDialNode(ctx, &node, prm)
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("dial failure: endpoint %s isn't valid", endpoint)
|
||||
}
|
||||
|
||||
// sets underlying provider of frostFSAPIServer. The method is used for testing as an approach
|
||||
// to skip Dial stage and override FrostFS API server. MUST NOT be used outside test code.
|
||||
// In real applications wrapper over git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client
|
||||
|
@ -141,6 +202,8 @@ type PrmInit struct {
|
|||
|
||||
ResponseInfoCallback func(ResponseMetaInfo) error
|
||||
|
||||
NetMap *netmap.NetMap
|
||||
|
||||
NetMagic uint64
|
||||
}
|
||||
|
||||
|
@ -181,12 +244,7 @@ const (
|
|||
defaultStreamTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// PrmDial groups connection parameters for the Client.
|
||||
//
|
||||
// See also Dial.
|
||||
type PrmDial struct {
|
||||
Endpoint string
|
||||
|
||||
type PrmDialOptions struct {
|
||||
TLSConfig *tls.Config
|
||||
|
||||
// If DialTimeout is non-positive, then it's set to defaultDialTimeout.
|
||||
|
@ -198,6 +256,24 @@ type PrmDial struct {
|
|||
GRPCDialOptions []grpc.DialOption
|
||||
}
|
||||
|
||||
// PrmDial groups connection parameters for the Dial call in the Client.
|
||||
//
|
||||
// See also Dial.
|
||||
type PrmDial struct {
|
||||
Endpoint string
|
||||
|
||||
PrmDialOptions
|
||||
}
|
||||
|
||||
// PrmNetMapDial groups connection parameters for the NetMapDial call in the Client.
|
||||
type PrmNetMapDial struct {
|
||||
UseExternalAddresses bool
|
||||
UseRandomAddress bool
|
||||
FallbackToAvailableAddress bool
|
||||
|
||||
PrmDialOptions
|
||||
}
|
||||
|
||||
// SetServerURI sets server URI in the FrostFS network.
|
||||
// Required parameter.
|
||||
//
|
||||
|
@ -222,30 +298,30 @@ func (x *PrmDial) SetServerURI(endpoint string) {
|
|||
//
|
||||
// See also SetServerURI.
|
||||
//
|
||||
// Depreacted: Use PrmDial.TLSConfig instead.
|
||||
func (x *PrmDial) SetTLSConfig(tlsConfig *tls.Config) {
|
||||
// Deprecated: Use PrmDialOptions.TLSConfig instead.
|
||||
func (x *PrmDialOptions) SetTLSConfig(tlsConfig *tls.Config) {
|
||||
x.TLSConfig = tlsConfig
|
||||
}
|
||||
|
||||
// SetTimeout sets the timeout for connection to be established.
|
||||
// MUST BE positive. If not called, 5s timeout will be used by default.
|
||||
//
|
||||
// Deprecated: Use PrmDial.DialTimeout instead.
|
||||
func (x *PrmDial) SetTimeout(timeout time.Duration) {
|
||||
// Deprecated: Use PrmDialOptions.DialTimeout instead.
|
||||
func (x *PrmDialOptions) SetTimeout(timeout time.Duration) {
|
||||
x.DialTimeout = timeout
|
||||
}
|
||||
|
||||
// SetStreamTimeout sets the timeout for individual operations in streaming RPC.
|
||||
// MUST BE positive. If not called, 10s timeout will be used by default.
|
||||
//
|
||||
// Deprecated: Use PrmDial.StreamTimeout instead.
|
||||
func (x *PrmDial) SetStreamTimeout(timeout time.Duration) {
|
||||
// Deprecated: Use PrmDialOptions.StreamTimeout instead.
|
||||
func (x *PrmDialOptions) SetStreamTimeout(timeout time.Duration) {
|
||||
x.StreamTimeout = timeout
|
||||
}
|
||||
|
||||
// SetGRPCDialOptions sets the gRPC dial options for new gRPC client connection.
|
||||
//
|
||||
// Deprecated: Use PrmDial.GRPCDialOptions instead.
|
||||
func (x *PrmDial) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||
// Deprecated: Use PrmDialOptions.GRPCDialOptions instead.
|
||||
func (x *PrmDialOptions) SetGRPCDialOptions(opts ...grpc.DialOption) {
|
||||
x.GRPCDialOptions = opts
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -65,3 +67,38 @@ func TestClient_DialContext(t *testing.T) {
|
|||
|
||||
assert(ctx, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func TestClient_NetMapDialContext(t *testing.T) {
|
||||
var prmInit PrmInit
|
||||
var c Client
|
||||
publicKey := "foo"
|
||||
endpoint := "localhost:8080"
|
||||
|
||||
node := netmap.NodeInfo{}
|
||||
node.SetPublicKey([]byte(publicKey))
|
||||
node.SetExternalAddresses(endpoint)
|
||||
netMap := &netmap.NetMap{}
|
||||
netMap.SetNodes([]netmap.NodeInfo{node})
|
||||
|
||||
prmInit.NetMap = netMap
|
||||
c.Init(prmInit)
|
||||
|
||||
prmNetMapDial := PrmNetMapDial{}
|
||||
|
||||
assert := func(ctx context.Context, errExpected error) {
|
||||
// expect particular context error according to Dial docs
|
||||
require.ErrorIs(t, c.NetMapDial(ctx, fmt.Sprintf("frostfs://%s", publicKey), prmNetMapDial), errExpected)
|
||||
}
|
||||
|
||||
// create pre-abandoned context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
assert(ctx, context.Canceled)
|
||||
|
||||
// create "pre-deadlined" context
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 0)
|
||||
defer cancel()
|
||||
|
||||
assert(ctx, context.DeadlineExceeded)
|
||||
}
|
||||
|
|
20
pool/pool.go
20
pool/pool.go
|
@ -342,10 +342,12 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
|||
}
|
||||
|
||||
prmDial := sdkClient.PrmDial{
|
||||
Endpoint: c.prm.address,
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
Endpoint: c.prm.address,
|
||||
PrmDialOptions: sdkClient.PrmDialOptions{
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
},
|
||||
}
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
|
@ -381,10 +383,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
cl.Init(prmInit)
|
||||
|
||||
prmDial := sdkClient.PrmDial{
|
||||
Endpoint: c.prm.address,
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
Endpoint: c.prm.address,
|
||||
PrmDialOptions: sdkClient.PrmDialOptions{
|
||||
DialTimeout: c.prm.dialTimeout,
|
||||
StreamTimeout: c.prm.streamTimeout,
|
||||
GRPCDialOptions: c.prm.dialOptions,
|
||||
},
|
||||
}
|
||||
|
||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue
stdlib imports should form a separate group, could you move
netmap
below?stdlib imports should form a separate group, could you move
netmap
below?Also, we have netmap in the SDK (this repo), can we use it instead of the api package?
Yes, I used the grpc domain by mistake.
Fixed.