[#305] tree/pool: Add flag to use net map to prioritize tree services #305

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-sdk-go:feature/tree_pool_priority into master 2024-12-18 06:51:00 +00:00
8 changed files with 687 additions and 36 deletions

12
go.mod
View file

@ -14,6 +14,7 @@ require (
github.com/klauspost/reedsolomon v1.12.1
github.com/mailru/easyjson v0.7.7
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.14.0
fyrchik marked this conversation as resolved Outdated

The latest version is v0.14.0, why have you stopped on this one?

The latest version is v0.14.0, why have you stopped on this one?

Because it's used in frostfs-node

Because it's used in frostfs-node

Because it's used in frostfs-node

If you are concerned about building frostfs-node, then it's fine - Minimal Version Selection is able to handle this

> Because it's used in frostfs-node If you are concerned about building `frostfs-node`, then it's fine - Minimal Version Selection is able to handle this

Let's use the latest one, no problem.

Let's use the latest one, no problem.
github.com/nspcc-dev/neo-go v0.106.2
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
@ -29,13 +30,20 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/ipfs/go-cid v0.0.7 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
go.etcd.io/bbolt v1.3.9 // indirect
@ -46,5 +54,5 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

BIN
go.sum

Binary file not shown.

112
pkg/network/address.go Normal file
View file

@ -0,0 +1,112 @@
// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/address.go

I suggest to mention in the header that this code is taken from frostfs-node pkg/network package. Maybe one day it could be imported back.

By the way, let's import some tests too.

I suggest to mention in the header that this code is taken from frostfs-node `pkg/network` package. Maybe one day it could be imported back. By the way, let's import some [tests](https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/network/address_test.go) too.

Refs #35 (if we can immediately reuse this PR from node, then Close #35)

Refs #35 (if we can immediately reuse this PR from node, then `Close #35`)

I would also like to use this across all the clients/pools, though.

I would also like to use this across all the clients/pools, though.

There are other files in frostfs-node package, should they be added too?

There are other files in frostfs-node package, should they be added too?

If they are not needed by this PR, we may leave them for #35

If they are not needed by this PR, we may leave them for #35

It's better to link exact revision instead of a branch.

https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/7e3f2fca47bac063ced310ef4fd8042e32dd0c80/pkg/network/address.go
It's better to link exact revision instead of a branch. ``` https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/7e3f2fca47bac063ced310ef4fd8042e32dd0c80/pkg/network/address.go ```
package network
import (
"errors"
"fmt"
"net"
"net/url"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var errHostIsEmpty = errors.New("host is empty")
// Address represents the FrostFS node
// network address.
type Address struct {
ma multiaddr.Multiaddr
}
// URIAddr returns Address as a URI.
//
// Panics if host address cannot be fetched from Address.
//
// See also FromString.
func (a Address) URIAddr() string {
_, host, err := manet.DialArgs(a.ma)
if err != nil {
// the only correct way to construct Address is AddressFromString
// which makes this error appear unexpected
panic(fmt.Errorf("could not get host addr: %w", err))
}
if !a.IsTLSEnabled() {
return host
}
return (&url.URL{
Scheme: "grpcs",
Host: host,
}).String()
}
// FromString restores Address from a string representation.
//
// Supports URIAddr, MultiAddr and HostAddr strings.
func (a *Address) FromString(s string) error {
var err error
a.ma, err = multiaddr.NewMultiaddr(s)
if err != nil {
var (
host string
hasTLS bool
)
host, hasTLS, err = client.ParseURI(s)
if err != nil {
host = s
}
s, err = multiaddrStringFromHostAddr(host)
if err == nil {
a.ma, err = multiaddr.NewMultiaddr(s)
if err == nil && hasTLS {
a.ma = a.ma.Encapsulate(tls)
}
}
}
return err
}
// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080".
func multiaddrStringFromHostAddr(host string) (string, error) {
if len(host) == 0 {
return "", errHostIsEmpty
}
endpoint, port, err := net.SplitHostPort(host)
if err != nil {
return "", err
}
// Empty address in host `:8080` generates `/dns4//tcp/8080` multiaddr
// which is invalid. It could be `/tcp/8080` but this breaks
// `manet.DialArgs`. The solution is to manually parse it as 0.0.0.0
if endpoint == "" {
return "/ip4/0.0.0.0/tcp/" + port, nil
}
var (
prefix = "/dns4"
addr = endpoint
)
if ip := net.ParseIP(endpoint); ip != nil {
addr = ip.String()
if ip.To4() == nil {
prefix = "/ip6"
} else {
prefix = "/ip4"
}
}
const l4Protocol = "tcp"
return strings.Join([]string{prefix, addr, l4Protocol, port}, "/"), nil
}

View file

@ -0,0 +1,83 @@
// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/address_test.go
package network
import (
"testing"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
func TestAddressFromString(t *testing.T) {
t.Run("valid addresses", func(t *testing.T) {
testcases := []struct {
inp string
exp multiaddr.Multiaddr
}{
{":8080", buildMultiaddr("/ip4/0.0.0.0/tcp/8080", t)},
{"example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070", t)},
{"213.44.87.1:32512", buildMultiaddr("/ip4/213.44.87.1/tcp/32512", t)},
{"[2004:eb1::1]:8080", buildMultiaddr("/ip6/2004:eb1::1/tcp/8080", t)},
{"grpc://example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070", t)},
{"grpcs://example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070/"+tlsProtocolName, t)},
}
var addr Address
for _, testcase := range testcases {
err := addr.FromString(testcase.inp)
require.NoError(t, err)
require.Equal(t, testcase.exp, addr.ma, testcase.inp)
}
})
t.Run("invalid addresses", func(t *testing.T) {
testCases := []string{
"wtf://example.com:123", // wrong scheme
"grpc://example.com", // missing port
}
var addr Address
for _, tc := range testCases {
require.Error(t, addr.FromString(tc))
}
})
}
func TestAddress_HostAddrString(t *testing.T) {
t.Run("valid addresses", func(t *testing.T) {
testcases := []struct {
ma multiaddr.Multiaddr
exp string
}{
{buildMultiaddr("/dns4/frostfs.bigcorp.com/tcp/8080", t), "frostfs.bigcorp.com:8080"},
{buildMultiaddr("/ip4/172.16.14.1/tcp/8080", t), "172.16.14.1:8080"},
{buildMultiaddr("/ip4/192.168.0.1/tcp/8888/tls", t), "grpcs://192.168.0.1:8888"},
}
for _, testcase := range testcases {
addr := Address{testcase.ma}
got := addr.URIAddr()
require.Equal(t, testcase.exp, got)
}
})
t.Run("invalid addresses", func(t *testing.T) {
testcases := []multiaddr.Multiaddr{
buildMultiaddr("/tcp/8080", t),
}
for _, testcase := range testcases {
addr := Address{testcase}
require.Panics(t, func() { addr.URIAddr() })
}
})
}
func buildMultiaddr(s string, t *testing.T) multiaddr.Multiaddr {
ma, err := multiaddr.NewMultiaddr(s)
require.NoError(t, err)
return ma
}

18
pkg/network/tls.go Normal file
View file

@ -0,0 +1,18 @@
// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/tls.go
package network
import "github.com/multiformats/go-multiaddr"
const (
tlsProtocolName = "tls"
)
// tls var is used for (un)wrapping other multiaddrs around TLS multiaddr.
var tls, _ = multiaddr.NewMultiaddr("/" + tlsProtocolName)
// IsTLSEnabled searches for wrapped TLS protocol in multiaddr.
func (a Address) IsTLSEnabled() bool {
_, err := a.ma.ValueForProtocol(multiaddr.P_TLS)
return err == nil
}

46
pkg/network/tls_test.go Normal file
View file

@ -0,0 +1,46 @@
// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/tls_test.go
package network
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAddress_TLSEnabled(t *testing.T) {
testCases := [...]struct {
input string
wantTLS bool
}{
{"/dns4/localhost/tcp/8080", false},
{"/dns4/localhost/tcp/8080/tls", true},
{"/tls/dns4/localhost/tcp/8080", true},
{"grpc://localhost:8080", false},
{"grpcs://localhost:8080", true},
}
var addr Address
for _, test := range testCases {
err := addr.FromString(test.input)
require.NoError(t, err)
require.Equal(t, test.wantTLS, addr.IsTLSEnabled(), test.input)
}
}
func BenchmarkAddressTLSEnabled(b *testing.B) {
var addr Address
err := addr.FromString("/dns4/localhost/tcp/8080/tls")
require.NoError(b, err)
b.ResetTimer()
b.ReportAllocs()
var enabled bool
for range b.N {
enabled = addr.IsTLSEnabled()
}
require.True(b, enabled)
}

View file

@ -14,6 +14,8 @@ import (
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
@ -73,6 +75,12 @@ type InitParameters struct {
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
maxRequestAttempts int
netMapInfoSource NetMapInfoSource
}
dkirillov marked this conversation as resolved Outdated

Can we use more meaningful name for this param?

Can we use more meaningful name for this param?
type NetMapInfoSource interface {
NetMapSnapshot(ctx context.Context) (netmap.NetMap, error)
PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error)
}
dkirillov marked this conversation as resolved Outdated

Why isn't this method symmetrical to NetMapSnapshot?
I mean why do we try to get placement policy in tree pool (see Pool.getContainerPlacementPolicy) if PlacementPolicySource couldn't provide it to us?

Why isn't this method symmetrical to `NetMapSnapshot`? I mean why do we try to get placement policy in tree pool (see `Pool.getContainerPlacementPolicy`) if `PlacementPolicySource` couldn't provide it to us?

In most cases placement policy will be provided from s3-gw cache but in other cases this information should be requested from storage. Maybe we should do the same for netmap, but I'm not sure

In most cases placement policy will be provided from s3-gw cache but in other cases this information should be requested from storage. Maybe we should do the same for netmap, but I'm not sure
// Pool represents virtual connection to the FrostFS tree services network to communicate
@ -80,7 +88,7 @@ type InitParameters struct {
// due to their unavailability.
//
// Pool can be created and initialized using NewPool function.
// Before executing the FrostFS tree operations using the Pool, connection to the
// Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the
// servers MUST BE correctly established (see Dial method).
type Pool struct {
innerPools []*innerPool
@ -96,12 +104,18 @@ type Pool struct {
streamTimeout time.Duration
nodeDialTimeout time.Duration
startIndicesMtx sync.RWMutex
netMapInfoSource NetMapInfoSource
// mutex protects clientMap and startIndices
fyrchik marked this conversation as resolved Outdated

Please, make a comment about what is protected with this mutex.

Please, make a comment about what is protected with this mutex.
mutex sync.RWMutex
// clientMap will be used if netMapInfoSource is set
clientMap map[uint64]client
// startIndices points to the client from which the next request will be executed.
// Since clients are stored in innerPool field we have to use two indices.
// These indices being changed during:
// * rebalance procedure (see Pool.startRebalance)
// * retry in case of request failure (see Pool.requestWithRetry)
// startIndices will be used if netMapInfoSource is not set
startIndices [2]int
}
@ -213,11 +227,6 @@ func NewPool(options InitParameters) (*Pool, error) {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
fillDefaultInitParams(&options)
methods := make([]*pool.MethodStatus, methodLast)
@ -230,27 +239,44 @@ func NewPool(options InitParameters) (*Pool, error) {
logger: options.logger,
dialOptions: options.dialOptions,
rebalanceParams: rebalanceParameters{
nodesGroup: nodesParams,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
},
maxRequestAttempts: options.maxRequestAttempts,
streamTimeout: options.nodeStreamTimeout,
nodeDialTimeout: options.nodeDialTimeout,
methods: methods,
netMapInfoSource: options.netMapInfoSource,
clientMap: make(map[uint64]client),
}
if options.netMapInfoSource == nil {
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
p.rebalanceParams.nodesGroup = nodesParams
}
return p, nil
}
// Dial establishes a connection to the tree servers from the FrostFS network.
// Dial may not be called and will have no effect if netMapInfoSource is set.
// See also InitParameters.SetNetMapInfoSource
//
// Otherwise, Dial establishes a connection to the tree servers from the FrostFS network.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
// If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used.
fyrchik marked this conversation as resolved Outdated

It would've been surprising for me if I wasn't familiar with the pool internals.
What prevents us from not calling Dial if we provide netmapInfoSource?
Doc-comment should be changed at least.

It would've been surprising for me if I wasn't familiar with the pool internals. What prevents us from not calling `Dial` if we provide `netmapInfoSource`? Doc-comment should be changed at least.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
if p.netMapInfoSource != nil {
return nil
}
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
var atLeastOneHealthy bool
@ -334,6 +360,12 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
// SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy.
// If set, AddNode will have no effect.
func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) {
x.netMapInfoSource = netMapInfoSource
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
@ -359,7 +391,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod
}
var resp *tree.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
@ -463,7 +495,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
}
var cli *rpcapi.GetSubTreeResponseReader
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
return handleError("failed to get sub tree client", inErr)
})
@ -497,7 +529,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
}
var resp *tree.AddResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node", inErr)
})
@ -532,7 +564,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
}
var resp *tree.AddByPathResponse
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
return handleError("failed to add node by path", inErr)
})
@ -574,7 +606,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to move node", err)
}
@ -605,7 +637,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
return handleError("failed to remove node", err)
}
@ -631,6 +663,24 @@ func (p *Pool) Close() error {
}
}
if closeErr := p.closeClientMapConnections(); closeErr != nil {
err = closeErr
}
return err
}
func (p *Pool) closeClientMapConnections() (err error) {
p.mutex.Lock()
defer p.mutex.Unlock()
for _, cl := range p.clientMap {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
}
return err
}
@ -801,20 +851,24 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool)
}
func (p *Pool) getStartIndices() (int, int) {
p.startIndicesMtx.RLock()
defer p.startIndicesMtx.RUnlock()
p.mutex.RLock()
defer p.mutex.RUnlock()
return p.startIndices[0], p.startIndices[1]
}
func (p *Pool) setStartIndices(i, j int) {
p.startIndicesMtx.Lock()
p.mutex.Lock()
p.startIndices[0] = i
p.startIndices[1] = j
p.startIndicesMtx.Unlock()
p.mutex.Unlock()
}
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
if p.netMapInfoSource != nil {
return p.requestWithRetryContainerNodes(ctx, cnrID, fn)
}
var (
err, finErr error
cl *rpcclient.Client
@ -866,10 +920,141 @@ LOOP:
return finErr
}
func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error {
var (
err, finErr error
cl *rpcclient.Client
)
reqID := GetRequestID(ctx)
netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx)
if err != nil {
return fmt.Errorf("get net map: %w", err)
dkirillov marked this conversation as resolved Outdated

nitpick: Can we use variable name that don't collide with package names?
cid -> cnrID

nitpick: Can we use variable name that don't collide with package names? `cid` -> `cnrID`
}
policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID)
if err != nil {
return fmt.Errorf("get container placement policy: %w", err)
}
cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:])
if err != nil {
return fmt.Errorf("get container nodes: %w", err)
}
cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:])
if err != nil {
return fmt.Errorf("get placement vectors: %w", err)
}
attempts := p.maxRequestAttempts
LOOP:
for _, cnrNodeGroup := range cnrNodes {
for _, cnrNode := range cnrNodeGroup {
if attempts == 0 {
break LOOP
}
treeCl, ok := p.getClientFromMap(cnrNode.Hash())
if !ok {
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
if err != nil {
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
continue
}
p.addClientToMap(cnrNode.Hash(), treeCl)

I'm not sure if we should update this counter until we really we request to this client. The reason is we don't remember last healthy client and every request process cnrNodes in the same order. So if we have 4 node (2 of them unhealthy) and 2 max attempts we 1 or 2 epochs (until bad nodes disappear from netmap) will get errors.
In previous tree pool version we don't have such delay

I'm not sure if we should update this counter until we really we request to this client. The reason is we don't remember last healthy client and every request process `cnrNodes` in the same order. So if we have 4 node (2 of them unhealthy) and 2 max attempts we 1 or 2 epochs (until bad nodes disappear from netmap) will get errors. In previous tree pool version we don't have such delay

I agree with this statement. However I don't think we are going to have such small number of retries, but this is worth to take into account anyway.

I agree with this statement. However I don't think we are going to have such small number of retries, but this is worth to take into account anyway.
}
attempts--
dkirillov marked this conversation as resolved Outdated

We must access p.clientMap under mutex because we update it sometimes

We must access `p.clientMap` under mutex because we update it sometimes
aarifullin marked this conversation as resolved Outdated

Let me highlight some points about this method

Are you sure we need to check ctx within this invocation? It looks like you are trying to check the error within client call out of client call. I mean err will be assigned with context.Canceled or with context.DeadlineExceeded anyway after err = fn(cl). I believe to check ctx.Err() != nil before getNewTreeClient to avoid invocation that's definitely will be failed as context is done.
So, currently, I don't see any reason to check ctx within shouldRedial.

Also, the name shouldRedial is a bit complicating. WDYT, probably, isRedialableError could be better for this purpose?

Let me highlight some points about this method Are you sure we need to check `ctx` within this invocation? It looks like you are trying to check the error within client call out of client call. I mean `err` will be assigned with `context.Canceled` or with `context.DeadlineExceeded` anyway after `err = fn(cl)`. I believe to check `ctx.Err() != nil` before `getNewTreeClient` to avoid invocation that's definitely will be failed as context is done. So, currently, I don't see any reason to check ctx within `shouldRedial`. Also, the name `shouldRedial` is a bit complicating. WDYT, probably, `isRedialableError` could be better for this purpose?

Why err will be assigned with context.Canceled or with context.DeadlineExceeded anyway after err = fn(cl)?

Why err will be assigned with `context.Canceled` or with `context.DeadlineExceeded` anyway after `err = fn(cl)`?

I suppose the context is going to be propagated through functor

err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
		resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
		return handleError("failed to add node", inErr)
})

But, anyway, I agree we should check the error ctx.Err() but not within shouldRedial and before getNewTreeClient

I suppose the context is going to be propagated through functor ```go err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node", inErr) }) ``` But, anyway, I agree we should check the error `ctx.Err()` but not within `shouldRedial` and before `getNewTreeClient`

Okay, let me clarify what I really meant:

  1. err can be assigned with context cancellation errors as context is propagated through functor but that may happen on stream opening, not on Send/Recv messages
  2. From my POV: shouldRedial shouldn't check context error. Instread, this can be checked separately before the instantiation of a tree client
  3. shouldRedial is better to be renamed to isRedialableError :)
Okay, let me clarify what I really meant: 1. `err` can be assigned with context cancellation errors as context is propagated through functor but that may happen on stream opening, not on `Send/Recv` messages 2. From my POV: `shouldRedial` shouldn't check context error. Instread, this can be checked separately before the instantiation of a tree client 3. `shouldRedial` is better to be renamed to `isRedialableError` :)

I think redialable is not quite appropriate. The purpose of this function is to check whether error expected or connection to node should be established again next time.

I think redialable is not quite appropriate. The purpose of this function is to check whether error expected or connection to node should be established again next time.

Okay, it's up to you :)

Okay, it's up to you :)
if cl, err = treeCl.serviceClient(); err == nil {
err = fn(cl)
}
if shouldRedial(ctx, err) {
p.deleteClientFromMap(cnrNode.Hash())
}
if !shouldTryAgain(err) {
if err != nil {
err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", treeCl.endpoint()), zap.Error(err))
}
}
return finErr
}
func (p *Pool) getClientFromMap(hash uint64) (client, bool) {
p.mutex.RLock()
defer p.mutex.RUnlock()
cl, ok := p.clientMap[hash]
fyrchik marked this conversation as resolved Outdated

How about addClientToMap or setClientInMap? set client to map has different meaning.

How about `addClientToMap` or `setClientInMap`? `set client to map` has different meaning.
return cl, ok
}
func (p *Pool) addClientToMap(hash uint64, cl client) {
p.mutex.Lock()
p.clientMap[hash] = cl
p.mutex.Unlock()
}
func (p *Pool) deleteClientFromMap(hash uint64) {
p.mutex.Lock()
_ = p.clientMap[hash].close()
delete(p.clientMap, hash)
p.mutex.Unlock()
}
func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) {
var (
treeCl *treeClient
err error
)
node.IterateNetworkEndpoints(func(endpoint string) bool {
var addr network.Address
if err = addr.FromString(endpoint); err != nil {
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
return false
}
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
if err = newTreeCl.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
return false
}
treeCl = newTreeCl
return true
})
if treeCl == nil {
return nil, fmt.Errorf("tree client wasn't initialized")
dkirillov marked this conversation as resolved Outdated

Probably we should be more careful with list of error on which we do redial.
At least we we shouldn't count context canceled error (the similar to handleError function in object pool)

Probably we should be more careful with list of error on which we do redial. At least we we shouldn't count `context canceled` error (the similar to `handleError` function in object pool)

shouldRedial in general doesn't look very robust, but I don't have anything to suggest here, unfortunately.

`shouldRedial` in general doesn't look very robust, but I don't have anything to suggest here, unfortunately.
}
return treeCl, nil
}
func shouldTryAgain(err error) bool {
fyrchik marked this conversation as resolved Outdated

errors.Is(ctx.Err(), context.Canceled) is unexpected to me here, because it has err != nil even though the err is not checked.
Also context.DeadlineExceeded is somewhat similar, do we need to add it here?

`errors.Is(ctx.Err(), context.Canceled)` is unexpected to me here, because it has `err != nil` even though the err is not checked. Also `context.DeadlineExceeded` is somewhat similar, do we need to add it here?

This check added by analogy with object pool

This check added by analogy with [object pool](https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/src/commit/dcd4ea334e068afbdb103f49a8e59afd16238c32/pool/pool.go#L1353)

It seems any error from ctx.Error() make any dialing call with ctx not redialable as ctx is already Done()
ctx.Error() != nil could be enough?

It seems any error from `ctx.Error()` make any dialing call with `ctx` not redialable as `ctx` is already Done() `ctx.Error() != nil` could be enough?

If it is copypaste, then ok.

If it is copypaste, then ok.
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}
func shouldRedial(ctx context.Context, err error) bool {
if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) {
return false
}
return true
}
func prioErr(err error) int {
switch {
case err == nil:

View file

@ -7,7 +7,12 @@ import (
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
@ -15,12 +20,14 @@ import (
type treeClientMock struct {
address string
err bool
used bool
}
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
if t.err {
return nil, errors.New("serviceClient() mock error")
}
t.used = true
return nil, nil
}
@ -51,6 +58,23 @@ func (t *treeClientMock) close() error {
return nil
}
type netMapInfoMock struct {
netMap netmap.NetMap
policy netmap.PlacementPolicy
err error
}
func (n *netMapInfoMock) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
if n.err != nil {
return netmap.NetMap{}, n.err
}
return n.netMap, nil
}
func (n *netMapInfoMock) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
return n.policy, nil
}
func TestHandleError(t *testing.T) {
defaultError := errors.New("default error")
for _, tc := range []struct {
@ -104,14 +128,14 @@ func TestRetry(t *testing.T) {
}
t.Run("first ok", func(t *testing.T) {
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("first failed", func(t *testing.T) {
setErrors(p, "node00")
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 1)
})
@ -119,7 +143,7 @@ func TestRetry(t *testing.T) {
t.Run("all failed", func(t *testing.T) {
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -127,13 +151,13 @@ func TestRetry(t *testing.T) {
t.Run("round", func(t *testing.T) {
setErrors(p, nodes[0][0], nodes[0][1])
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndices(t, p, 0, 2)
resetClientsErrors(p)
setErrors(p, nodes[0][2], nodes[0][3])
err = p.requestWithRetry(ctx, makeFn)
err = p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -141,14 +165,14 @@ func TestRetry(t *testing.T) {
t.Run("group switch", func(t *testing.T) {
setErrors(p, nodes[0]...)
setErrors(p, nodes[1][0])
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 1)
})
t.Run("group round", func(t *testing.T) {
setErrors(p, nodes[0][1:]...)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
@ -156,7 +180,7 @@ func TestRetry(t *testing.T) {
t.Run("group round switch", func(t *testing.T) {
setErrors(p, nodes[0]...)
p.setStartIndices(0, 1)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 1, 0)
})
@ -164,14 +188,14 @@ func TestRetry(t *testing.T) {
t.Run("no panic group switch", func(t *testing.T) {
setErrors(p, nodes[1]...)
p.setStartIndices(1, 0)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.NoError(t, err)
checkIndicesAndReset(t, p, 0, 0)
})
t.Run("error empty result", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error {
if index < errNodes {
index++
return errNodeEmptyResult
@ -184,7 +208,7 @@ func TestRetry(t *testing.T) {
t.Run("error not found", func(t *testing.T) {
errNodes, index := 2, 0
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error {
if index < errNodes {
index++
return ErrNodeNotFound
@ -197,7 +221,7 @@ func TestRetry(t *testing.T) {
t.Run("error access denied", func(t *testing.T) {
var index int
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error {
index++
return ErrNodeAccessDenied
})
@ -211,7 +235,7 @@ func TestRetry(t *testing.T) {
p.maxRequestAttempts = 2
setErrors(p, nodes[0]...)
setErrors(p, nodes[1]...)
err := p.requestWithRetry(ctx, makeFn)
err := p.requestWithRetry(ctx, cidtest.ID(), makeFn)
require.Error(t, err)
checkIndicesAndReset(t, p, 0, 2)
p.maxRequestAttempts = oldVal
@ -273,6 +297,125 @@ func TestRebalance(t *testing.T) {
})
}
func TestRetryContainerNodes(t *testing.T) {
ctx := context.Background()
nodesCount := 3
policy := getPlacementPolicy(uint32(nodesCount))
p := &Pool{
logger: zaptest.NewLogger(t),
maxRequestAttempts: nodesCount,
}
var nm netmap.NetMap
for i := 0; i < nodesCount; i++ {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
nm.SetNodes(append(nm.Nodes(), getNodeInfo(key.Bytes())))
}
p.netMapInfoSource = &netMapInfoMock{netMap: nm, policy: policy}
cnrID := cidtest.ID()
cnrNodes, err := nm.ContainerNodes(policy, cnrID[:])
require.NoError(t, err)
cnrNodes, err = nm.PlacementVectors(cnrNodes, cnrID[:])
require.NoError(t, err)
require.Len(t, cnrNodes, 1)
require.Len(t, cnrNodes[0], nodesCount)
makeFn := func(client *rpcClient.Client) error {
return nil
}
t.Run("first ok", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
err = p.requestWithRetry(ctx, cnrID, makeFn)
require.NoError(t, err)
checkClientsUsage(t, p, cnrNodes[0][0])
checkClientsPresence(t, p, cnrNodes[0]...)
})
t.Run("first failed", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
setClientMapErrors(p, cnrNodes[0][0])
err = p.requestWithRetry(ctx, cnrID, makeFn)
require.NoError(t, err)
checkClientsUsage(t, p, cnrNodes[0][1])
checkClientsPresence(t, p, cnrNodes[0][1:]...)
})
t.Run("first two failed", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1])
err = p.requestWithRetry(ctx, cnrID, makeFn)
require.NoError(t, err)
checkClientsUsage(t, p, cnrNodes[0][2])
checkClientsPresence(t, p, cnrNodes[0][2])
})
t.Run("all failed", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1], cnrNodes[0][2])
err = p.requestWithRetry(ctx, cnrID, makeFn)
require.Error(t, err)
checkClientsUsage(t, p)
checkClientsPresence(t, p)
})
t.Run("error empty result", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
errNodes, index := 2, 0
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return errNodeEmptyResult
}
return nil
})
require.NoError(t, err)
checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...)
checkClientsPresence(t, p, cnrNodes[0]...)
})
t.Run("error not found", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
errNodes, index := 2, 0
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
if index < errNodes {
index++
return ErrNodeNotFound
}
return nil
})
require.NoError(t, err)
checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...)
checkClientsPresence(t, p, cnrNodes[0]...)
})
t.Run("error access denied", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
var index int
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
index++
return ErrNodeAccessDenied
})
require.ErrorIs(t, err, ErrNodeAccessDenied)
require.Equal(t, 1, index)
checkClientsUsage(t, p, cnrNodes[0][0])
checkClientsPresence(t, p, cnrNodes[0]...)
})
t.Run("limit attempts", func(t *testing.T) {
p.clientMap = makeClientMap(cnrNodes[0])
p.maxRequestAttempts = 2
setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1])
err = p.requestWithRetry(ctx, cnrID, makeFn)
require.Error(t, err)
checkClientsUsage(t, p)
checkClientsPresence(t, p, cnrNodes[0][2])
p.maxRequestAttempts = nodesCount
})
}
func makeInnerPool(nodes [][]string) []*innerPool {
res := make([]*innerPool, len(nodes))
@ -359,3 +502,59 @@ func containsStr(list []string, item string) bool {
return false
}
func makeClientMap(nodes []netmap.NodeInfo) map[uint64]client {
res := make(map[uint64]client, len(nodes))
for _, node := range nodes {
res[hrw.Hash(node.PublicKey())] = &treeClientMock{}
fyrchik marked this conversation as resolved Outdated

I see this function being used in tests once, but I don't understand why it is here.
We get placement policy, then container nodes, then work.
In real code this function won't exist, why is it needed for tests?

I see this function being used in tests once, but I don't understand why it is here. We get placement policy, then container nodes, then work. In real code this function won't exist, why is it needed for tests?

Because after we get placement vector, nodes can be reordered. It's used to reorder node keys to set errors in tests

Because after we get placement vector, nodes can be reordered. It's used to reorder node keys to set errors in tests

You have nodeKeys []*keys.PrivateKey, but it seems the only way you use it is to sift it through HRW hash and filter client nodes. Now the question: can we omit this array completely?
It will be makeClientMap([]netmap.NodeInfo), checkClientUsage(t, p, ...netmap.NodeInfo) etc.
I see no need in private keys currently.

You have `nodeKeys []*keys.PrivateKey`, but it seems the only way you use it is to sift it through HRW hash and filter client nodes. Now the question: can we omit this array completely? It will be `makeClientMap([]netmap.NodeInfo)`, `checkClientUsage(t, p, ...netmap.NodeInfo)` etc. I see no need in _private_ keys currently.
}
return res
}
func checkClientsPresence(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) {
require.Len(t, p.clientMap, len(nodes))
for _, node := range nodes {
require.NotNil(t, p.clientMap[hrw.Hash(node.PublicKey())])
}
}
func checkClientsUsage(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) {
for hash, cl := range p.clientMap {
if containsHash(nodes, hash) {
require.True(t, cl.(*treeClientMock).used)
} else {
require.False(t, cl.(*treeClientMock).used)
}
}
}
func setClientMapErrors(p *Pool, nodes ...netmap.NodeInfo) {
for hash, cl := range p.clientMap {
if containsHash(nodes, hash) {
cl.(*treeClientMock).err = true
}
}
}
func containsHash(list []netmap.NodeInfo, hash uint64) bool {
for i := range list {
if hrw.Hash(list[i].PublicKey()) == hash {
return true
}
}
return false
}
func getPlacementPolicy(replicas uint32) (p netmap.PlacementPolicy) {
var r netmap.ReplicaDescriptor
r.SetNumberOfObjects(replicas)
p.AddReplicas([]netmap.ReplicaDescriptor{r}...)
return p
}
func getNodeInfo(key []byte) netmap.NodeInfo {
var node netmap.NodeInfo
node.SetPublicKey(key)
return node
}