[#xxx] tree/pool: Use net map for prioritize tree services
New version of tree/pool selects tree service connection to make request based on the current net map and container placement policy Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
0352b5b191
commit
4d14c53489
6 changed files with 499 additions and 433 deletions
13
go.mod
13
go.mod
|
@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-sdk-go
|
||||||
go 1.22
|
go 1.22
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
|
@ -14,6 +15,7 @@ require (
|
||||||
github.com/klauspost/reedsolomon v1.12.1
|
github.com/klauspost/reedsolomon v1.12.1
|
||||||
github.com/mailru/easyjson v0.7.7
|
github.com/mailru/easyjson v0.7.7
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
|
github.com/multiformats/go-multiaddr v0.12.1
|
||||||
github.com/nspcc-dev/neo-go v0.106.2
|
github.com/nspcc-dev/neo-go v0.106.2
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
go.uber.org/zap v1.27.0
|
go.uber.org/zap v1.27.0
|
||||||
|
@ -29,13 +31,21 @@ require (
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||||
github.com/golang/snappy v0.0.1 // indirect
|
github.com/golang/snappy v0.0.1 // indirect
|
||||||
github.com/gorilla/websocket v1.5.1 // indirect
|
github.com/gorilla/websocket v1.5.1 // indirect
|
||||||
|
github.com/ipfs/go-cid v0.0.7 // indirect
|
||||||
github.com/josharian/intern v1.0.0 // indirect
|
github.com/josharian/intern v1.0.0 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||||
github.com/kr/pretty v0.1.0 // indirect
|
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
|
||||||
|
github.com/minio/sha256-simd v1.0.1 // indirect
|
||||||
|
github.com/multiformats/go-base32 v0.0.3 // indirect
|
||||||
|
github.com/multiformats/go-base36 v0.1.0 // indirect
|
||||||
|
github.com/multiformats/go-multibase v0.0.3 // indirect
|
||||||
|
github.com/multiformats/go-multihash v0.0.14 // indirect
|
||||||
|
github.com/multiformats/go-varint v0.0.6 // indirect
|
||||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
|
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
|
||||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||||
go.etcd.io/bbolt v1.3.9 // indirect
|
go.etcd.io/bbolt v1.3.9 // indirect
|
||||||
|
@ -46,5 +56,4 @@ require (
|
||||||
golang.org/x/sys v0.21.0 // indirect
|
golang.org/x/sys v0.21.0 // indirect
|
||||||
golang.org/x/text v0.16.0 // indirect
|
golang.org/x/text v0.16.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
|
||||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
|
|
||||||
)
|
)
|
||||||
|
|
BIN
go.sum
BIN
go.sum
Binary file not shown.
110
pool/tree/network/address.go
Normal file
110
pool/tree/network/address.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
package network
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errHostIsEmpty = errors.New("host is empty")
|
||||||
|
|
||||||
|
// Address represents the FrostFS node
|
||||||
|
// network address.
|
||||||
|
type Address struct {
|
||||||
|
ma multiaddr.Multiaddr
|
||||||
|
}
|
||||||
|
|
||||||
|
// URIAddr returns Address as a URI.
|
||||||
|
//
|
||||||
|
// Panics if host address cannot be fetched from Address.
|
||||||
|
//
|
||||||
|
// See also FromString.
|
||||||
|
func (a Address) URIAddr() string {
|
||||||
|
_, host, err := manet.DialArgs(a.ma)
|
||||||
|
if err != nil {
|
||||||
|
// the only correct way to construct Address is AddressFromString
|
||||||
|
// which makes this error appear unexpected
|
||||||
|
panic(fmt.Errorf("could not get host addr: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !a.IsTLSEnabled() {
|
||||||
|
return host
|
||||||
|
}
|
||||||
|
|
||||||
|
return (&url.URL{
|
||||||
|
Scheme: "grpcs",
|
||||||
|
Host: host,
|
||||||
|
}).String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromString restores Address from a string representation.
|
||||||
|
//
|
||||||
|
// Supports URIAddr, MultiAddr and HostAddr strings.
|
||||||
|
func (a *Address) FromString(s string) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
a.ma, err = multiaddr.NewMultiaddr(s)
|
||||||
|
if err != nil {
|
||||||
|
var (
|
||||||
|
host string
|
||||||
|
hasTLS bool
|
||||||
|
)
|
||||||
|
host, hasTLS, err = client.ParseURI(s)
|
||||||
|
if err != nil {
|
||||||
|
host = s
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err = multiaddrStringFromHostAddr(host)
|
||||||
|
if err == nil {
|
||||||
|
a.ma, err = multiaddr.NewMultiaddr(s)
|
||||||
|
if err == nil && hasTLS {
|
||||||
|
a.ma = a.ma.Encapsulate(tls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080".
|
||||||
|
func multiaddrStringFromHostAddr(host string) (string, error) {
|
||||||
|
if len(host) == 0 {
|
||||||
|
return "", errHostIsEmpty
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint, port, err := net.SplitHostPort(host)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Empty address in host `:8080` generates `/dns4//tcp/8080` multiaddr
|
||||||
|
// which is invalid. It could be `/tcp/8080` but this breaks
|
||||||
|
// `manet.DialArgs`. The solution is to manually parse it as 0.0.0.0
|
||||||
|
if endpoint == "" {
|
||||||
|
return "/ip4/0.0.0.0/tcp/" + port, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
prefix = "/dns4"
|
||||||
|
addr = endpoint
|
||||||
|
)
|
||||||
|
|
||||||
|
if ip := net.ParseIP(endpoint); ip != nil {
|
||||||
|
addr = ip.String()
|
||||||
|
if ip.To4() == nil {
|
||||||
|
prefix = "/ip6"
|
||||||
|
} else {
|
||||||
|
prefix = "/ip4"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const l4Protocol = "tcp"
|
||||||
|
|
||||||
|
return strings.Join([]string{prefix, addr, l4Protocol, port}, "/"), nil
|
||||||
|
}
|
16
pool/tree/network/tls.go
Normal file
16
pool/tree/network/tls.go
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
package network
|
||||||
|
|
||||||
|
import "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
const (
|
||||||
|
tlsProtocolName = "tls"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tls var is used for (un)wrapping other multiaddrs around TLS multiaddr.
|
||||||
|
var tls, _ = multiaddr.NewMultiaddr("/" + tlsProtocolName)
|
||||||
|
|
||||||
|
// IsTLSEnabled searches for wrapped TLS protocol in multiaddr.
|
||||||
|
func (a Address) IsTLSEnabled() bool {
|
||||||
|
_, err := a.ma.ValueForProtocol(multiaddr.P_TLS)
|
||||||
|
return err == nil
|
||||||
|
}
|
|
@ -7,14 +7,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc"
|
||||||
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
|
||||||
|
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/network"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
@ -22,8 +25,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultRebalanceInterval = 15 * time.Second
|
|
||||||
defaultHealthcheckTimeout = 4 * time.Second
|
|
||||||
defaultDialTimeout = 5 * time.Second
|
defaultDialTimeout = 5 * time.Second
|
||||||
defaultStreamTimeout = 10 * time.Second
|
defaultStreamTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
@ -68,11 +69,19 @@ type InitParameters struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
nodeDialTimeout time.Duration
|
nodeDialTimeout time.Duration
|
||||||
nodeStreamTimeout time.Duration
|
nodeStreamTimeout time.Duration
|
||||||
healthcheckTimeout time.Duration
|
|
||||||
clientRebalanceInterval time.Duration
|
|
||||||
nodeParams []pool.NodeParam
|
nodeParams []pool.NodeParam
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
maxRequestAttempts int
|
maxRequestAttempts int
|
||||||
|
netMapSource NetMapSource
|
||||||
|
placementPolicySource PlacementPolicySource
|
||||||
|
}
|
||||||
|
|
||||||
|
type NetMapSource interface {
|
||||||
|
NetMapSnapshot(ctx context.Context) (netmap.NetMap, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type PlacementPolicySource interface {
|
||||||
|
PlacementPolicy(cnrID cid.ID) (netmap.PlacementPolicy, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
// Pool represents virtual connection to the FrostFS tree services network to communicate
|
||||||
|
@ -85,9 +94,7 @@ type InitParameters struct {
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
innerPools []*innerPool
|
innerPools []*innerPool
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
cancel context.CancelFunc
|
nodesGroup [][]pool.NodeParam
|
||||||
closedCh chan struct{}
|
|
||||||
rebalanceParams rebalanceParameters
|
|
||||||
dialOptions []grpc.DialOption
|
dialOptions []grpc.DialOption
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
methods []*pool.MethodStatus
|
methods []*pool.MethodStatus
|
||||||
|
@ -96,23 +103,13 @@ type Pool struct {
|
||||||
streamTimeout time.Duration
|
streamTimeout time.Duration
|
||||||
nodeDialTimeout time.Duration
|
nodeDialTimeout time.Duration
|
||||||
|
|
||||||
startIndicesMtx sync.RWMutex
|
netMapSource NetMapSource
|
||||||
// startIndices points to the client from which the next request will be executed.
|
policySource PlacementPolicySource
|
||||||
// Since clients are stored in innerPool field we have to use two indices.
|
clientMap map[uint64]client
|
||||||
// These indices being changed during:
|
|
||||||
// * rebalance procedure (see Pool.startRebalance)
|
|
||||||
// * retry in case of request failure (see Pool.requestWithRetry)
|
|
||||||
startIndices [2]int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type innerPool struct {
|
type innerPool struct {
|
||||||
clients []client
|
clients []*sdkClient.Client
|
||||||
}
|
|
||||||
|
|
||||||
type rebalanceParameters struct {
|
|
||||||
nodesGroup [][]pool.NodeParam
|
|
||||||
nodeRequestTimeout time.Duration
|
|
||||||
clientRebalanceInterval time.Duration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodesParams groups parameters of Pool.GetNodes operation.
|
// GetNodesParams groups parameters of Pool.GetNodes operation.
|
||||||
|
@ -213,6 +210,14 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
return nil, fmt.Errorf("missed required parameter 'Key'")
|
return nil, fmt.Errorf("missed required parameter 'Key'")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if options.netMapSource == nil {
|
||||||
|
return nil, fmt.Errorf("missed required parameter 'NetMap source'")
|
||||||
|
}
|
||||||
|
|
||||||
|
if options.placementPolicySource == nil {
|
||||||
|
return nil, fmt.Errorf("missed required parameter 'Placement policy source'")
|
||||||
|
}
|
||||||
|
|
||||||
nodesParams, err := adjustNodeParams(options.nodeParams)
|
nodesParams, err := adjustNodeParams(options.nodeParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -225,45 +230,17 @@ func NewPool(options InitParameters) (*Pool, error) {
|
||||||
methods[i] = pool.NewMethodStatus(i.String())
|
methods[i] = pool.NewMethodStatus(i.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &Pool{
|
inner := make([]*innerPool, len(nodesParams))
|
||||||
key: options.key,
|
for i, nodes := range nodesParams {
|
||||||
logger: options.logger,
|
clients := make([]*sdkClient.Client, len(nodes))
|
||||||
dialOptions: options.dialOptions,
|
for j := range nodes {
|
||||||
rebalanceParams: rebalanceParameters{
|
var cl sdkClient.Client
|
||||||
nodesGroup: nodesParams,
|
prmInit := sdkClient.PrmInit{
|
||||||
nodeRequestTimeout: options.healthcheckTimeout,
|
Key: options.key.PrivateKey,
|
||||||
clientRebalanceInterval: options.clientRebalanceInterval,
|
|
||||||
},
|
|
||||||
maxRequestAttempts: options.maxRequestAttempts,
|
|
||||||
streamTimeout: options.nodeStreamTimeout,
|
|
||||||
methods: methods,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
cl.Init(prmInit)
|
||||||
}
|
clients[j] = &cl
|
||||||
|
|
||||||
// Dial establishes a connection to the tree servers from the FrostFS network.
|
|
||||||
// It also starts a routine that checks the health of the nodes and
|
|
||||||
// updates the weights of the nodes for balancing.
|
|
||||||
// Returns an error describing failure reason.
|
|
||||||
//
|
|
||||||
// If failed, the Pool SHOULD NOT be used.
|
|
||||||
//
|
|
||||||
// See also InitParameters.SetClientRebalanceInterval.
|
|
||||||
func (p *Pool) Dial(ctx context.Context) error {
|
|
||||||
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
|
|
||||||
var atLeastOneHealthy bool
|
|
||||||
|
|
||||||
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
||||||
clients := make([]client, len(nodes))
|
|
||||||
for j, node := range nodes {
|
|
||||||
clients[j] = newTreeClient(node.Address(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
|
||||||
if err := clients[j].dial(ctx); err != nil {
|
|
||||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
atLeastOneHealthy = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inner[i] = &innerPool{
|
inner[i] = &innerPool{
|
||||||
|
@ -271,16 +248,53 @@ func (p *Pool) Dial(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p := &Pool{
|
||||||
|
key: options.key,
|
||||||
|
logger: options.logger,
|
||||||
|
dialOptions: options.dialOptions,
|
||||||
|
nodesGroup: nodesParams,
|
||||||
|
maxRequestAttempts: options.maxRequestAttempts,
|
||||||
|
streamTimeout: options.nodeStreamTimeout,
|
||||||
|
nodeDialTimeout: options.nodeDialTimeout,
|
||||||
|
methods: methods,
|
||||||
|
netMapSource: options.netMapSource,
|
||||||
|
policySource: options.placementPolicySource,
|
||||||
|
clientMap: make(map[uint64]client),
|
||||||
|
innerPools: inner,
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dial establishes a connection to the servers from the FrostFS network.
|
||||||
|
// Returns an error describing failure reason.
|
||||||
|
//
|
||||||
|
// If failed, the Pool SHOULD NOT be used.
|
||||||
|
func (p *Pool) Dial(ctx context.Context) error {
|
||||||
|
var atLeastOneHealthy bool
|
||||||
|
|
||||||
|
for i, nodes := range p.nodesGroup {
|
||||||
|
for j, node := range nodes {
|
||||||
|
prmDial := sdkClient.PrmDial{
|
||||||
|
Endpoint: node.Address(),
|
||||||
|
DialTimeout: p.nodeDialTimeout,
|
||||||
|
StreamTimeout: p.streamTimeout,
|
||||||
|
GRPCDialOptions: p.dialOptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.innerPools[i].clients[j].Dial(ctx, prmDial); err != nil {
|
||||||
|
p.log(zap.WarnLevel, "failed to dial client", zap.String("address", node.Address()), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
atLeastOneHealthy = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !atLeastOneHealthy {
|
if !atLeastOneHealthy {
|
||||||
return fmt.Errorf("at least one node must be healthy")
|
return fmt.Errorf("at least one node must be healthy")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
p.cancel = cancel
|
|
||||||
p.closedCh = make(chan struct{})
|
|
||||||
p.innerPools = inner
|
|
||||||
|
|
||||||
go p.startRebalance(ctx)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,20 +318,6 @@ func (x *InitParameters) SetNodeStreamTimeout(timeout time.Duration) {
|
||||||
x.nodeStreamTimeout = timeout
|
x.nodeStreamTimeout = timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
|
|
||||||
//
|
|
||||||
// See also Pool.Dial.
|
|
||||||
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
|
||||||
x.healthcheckTimeout = timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
|
||||||
//
|
|
||||||
// See also Pool.Dial.
|
|
||||||
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
|
|
||||||
x.clientRebalanceInterval = interval
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddNode append information about the node to which you want to connect.
|
// AddNode append information about the node to which you want to connect.
|
||||||
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
|
func (x *InitParameters) AddNode(nodeParam pool.NodeParam) {
|
||||||
x.nodeParams = append(x.nodeParams, nodeParam)
|
x.nodeParams = append(x.nodeParams, nodeParam)
|
||||||
|
@ -334,6 +334,16 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
|
||||||
x.maxRequestAttempts = maxAttempts
|
x.maxRequestAttempts = maxAttempts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetNetMapSource sets implementation of interface to get current net map.
|
||||||
|
func (x *InitParameters) SetNetMapSource(netMapSource NetMapSource) {
|
||||||
|
x.netMapSource = netMapSource
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPlacementPolicySource sets implementation of interface to get container placement policy.
|
||||||
|
func (x *InitParameters) SetPlacementPolicySource(placementPolicySource PlacementPolicySource) {
|
||||||
|
x.placementPolicySource = placementPolicySource
|
||||||
|
}
|
||||||
|
|
||||||
// GetNodes invokes eponymous method from TreeServiceClient.
|
// GetNodes invokes eponymous method from TreeServiceClient.
|
||||||
//
|
//
|
||||||
// Can return predefined errors:
|
// Can return predefined errors:
|
||||||
|
@ -359,7 +369,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *tree.GetNodeByPathResponse
|
var resp *tree.GetNodeByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
||||||
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
|
resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx))
|
||||||
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
|
||||||
// Empty result is expected due to delayed tree service sync.
|
// Empty result is expected due to delayed tree service sync.
|
||||||
|
@ -463,7 +473,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var cli *rpcapi.GetSubTreeResponseReader
|
var cli *rpcapi.GetSubTreeResponseReader
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
||||||
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx))
|
||||||
return handleError("failed to get sub tree client", inErr)
|
return handleError("failed to get sub tree client", inErr)
|
||||||
})
|
})
|
||||||
|
@ -497,7 +507,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *tree.AddResponse
|
var resp *tree.AddResponse
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
||||||
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
|
resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx))
|
||||||
return handleError("failed to add node", inErr)
|
return handleError("failed to add node", inErr)
|
||||||
})
|
})
|
||||||
|
@ -532,7 +542,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp *tree.AddByPathResponse
|
var resp *tree.AddByPathResponse
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) {
|
||||||
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
|
resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx))
|
||||||
return handleError("failed to add node by path", inErr)
|
return handleError("failed to add node by path", inErr)
|
||||||
})
|
})
|
||||||
|
@ -574,7 +584,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
|
||||||
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
|
if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||||
return handleError("failed to move node", err)
|
return handleError("failed to move node", err)
|
||||||
}
|
}
|
||||||
|
@ -605,7 +615,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error {
|
err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error {
|
||||||
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
|
if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil {
|
||||||
return handleError("failed to remove node", err)
|
return handleError("failed to remove node", err)
|
||||||
}
|
}
|
||||||
|
@ -618,19 +628,23 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
||||||
|
|
||||||
// Close closes the Pool and releases all the associated resources.
|
// Close closes the Pool and releases all the associated resources.
|
||||||
func (p *Pool) Close() error {
|
func (p *Pool) Close() error {
|
||||||
p.cancel()
|
|
||||||
<-p.closedCh
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
for _, group := range p.innerPools {
|
for _, group := range p.innerPools {
|
||||||
for _, cl := range group.clients {
|
for _, cl := range group.clients {
|
||||||
if closeErr := cl.close(); closeErr != nil {
|
if closeErr := cl.Close(); closeErr != nil {
|
||||||
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
||||||
err = closeErr
|
err = closeErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, cl := range p.clientMap {
|
||||||
|
if closeErr := cl.close(); closeErr != nil {
|
||||||
|
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
|
||||||
|
err = closeErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -695,14 +709,6 @@ func adjustNodeParams(nodeParams []pool.NodeParam) ([][]pool.NodeParam, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func fillDefaultInitParams(params *InitParameters) {
|
func fillDefaultInitParams(params *InitParameters) {
|
||||||
if params.clientRebalanceInterval <= 0 {
|
|
||||||
params.clientRebalanceInterval = defaultRebalanceInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.healthcheckTimeout <= 0 {
|
|
||||||
params.healthcheckTimeout = defaultHealthcheckTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.nodeDialTimeout <= 0 {
|
if params.nodeDialTimeout <= 0 {
|
||||||
params.nodeDialTimeout = defaultDialTimeout
|
params.nodeDialTimeout = defaultDialTimeout
|
||||||
}
|
}
|
||||||
|
@ -724,97 +730,7 @@ func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||||
p.logger.Log(level, msg, fields...)
|
p.logger.Log(level, msg, fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// startRebalance runs loop to monitor tree client healthy status.
|
func (p *Pool) requestWithRetry(ctx context.Context, cid cid.ID, fn func(client *rpcclient.Client) error) error {
|
||||||
func (p *Pool) startRebalance(ctx context.Context) {
|
|
||||||
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
|
||||||
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
|
|
||||||
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
||||||
buffers[i] = make([]bool, len(nodes))
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
close(p.closedCh)
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
ticker.Reset(p.rebalanceParams.clientRebalanceInterval)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) updateNodesHealth(ctx context.Context, buffers [][]bool) {
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
for i, inner := range p.innerPools {
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func(i int, _ *innerPool) {
|
|
||||||
defer wg.Done()
|
|
||||||
p.updateInnerNodesHealth(ctx, i, buffers[i])
|
|
||||||
}(i, inner)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
|
|
||||||
LOOP:
|
|
||||||
for i, buffer := range buffers {
|
|
||||||
for j, healthy := range buffer {
|
|
||||||
if healthy {
|
|
||||||
p.setStartIndices(i, j)
|
|
||||||
break LOOP
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) {
|
|
||||||
if i > len(p.innerPools)-1 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
nodesByPriority := p.innerPools[i]
|
|
||||||
options := p.rebalanceParams
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for j, cli := range nodesByPriority.clients {
|
|
||||||
wg.Add(1)
|
|
||||||
go func(j int, cli client) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
|
||||||
defer c()
|
|
||||||
|
|
||||||
changed, err := cli.redialIfNecessary(tctx)
|
|
||||||
healthy := err == nil
|
|
||||||
if changed {
|
|
||||||
fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)}
|
|
||||||
if err != nil {
|
|
||||||
fields = append(fields, zap.Error(err))
|
|
||||||
}
|
|
||||||
p.log(zap.DebugLevel, "tree health has changed", fields...)
|
|
||||||
} else if err != nil {
|
|
||||||
p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err))
|
|
||||||
}
|
|
||||||
buffer[j] = healthy
|
|
||||||
}(j, cli)
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) getStartIndices() (int, int) {
|
|
||||||
p.startIndicesMtx.RLock()
|
|
||||||
defer p.startIndicesMtx.RUnlock()
|
|
||||||
|
|
||||||
return p.startIndices[0], p.startIndices[1]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) setStartIndices(i, j int) {
|
|
||||||
p.startIndicesMtx.Lock()
|
|
||||||
p.startIndices[0] = i
|
|
||||||
p.startIndices[1] = j
|
|
||||||
p.startIndicesMtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error {
|
|
||||||
var (
|
var (
|
||||||
err, finErr error
|
err, finErr error
|
||||||
cl *rpcclient.Client
|
cl *rpcclient.Client
|
||||||
|
@ -822,35 +738,57 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.C
|
||||||
|
|
||||||
reqID := GetRequestID(ctx)
|
reqID := GetRequestID(ctx)
|
||||||
|
|
||||||
startI, startJ := p.getStartIndices()
|
netMap, err := p.netMapSource.NetMapSnapshot(ctx)
|
||||||
groupsLen := len(p.innerPools)
|
if err != nil {
|
||||||
|
return fmt.Errorf("get net map: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
policy, ok := p.policySource.PlacementPolicy(cid)
|
||||||
|
if !ok {
|
||||||
|
policy, err = p.getContainerPlacementPolicy(ctx, cid, reqID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get container placement policy: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrNodes, err := netMap.ContainerNodes(policy, cid[:])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get container nodes: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrNodes, err = netMap.PlacementVectors(cnrNodes, cid[:])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get placement vectors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
attempts := p.maxRequestAttempts
|
attempts := p.maxRequestAttempts
|
||||||
|
|
||||||
LOOP:
|
LOOP:
|
||||||
for i := startI; i < startI+groupsLen; i++ {
|
for _, cnrNodeGroup := range cnrNodes {
|
||||||
indexI := i % groupsLen
|
for _, cnrNode := range cnrNodeGroup {
|
||||||
clientsLen := len(p.innerPools[indexI].clients)
|
|
||||||
for j := startJ; j < startJ+clientsLen; j++ {
|
|
||||||
indexJ := j % clientsLen
|
|
||||||
|
|
||||||
if attempts == 0 {
|
if attempts == 0 {
|
||||||
if startI != indexI || startJ != indexJ {
|
|
||||||
p.setStartIndices(indexI, indexJ)
|
|
||||||
}
|
|
||||||
break LOOP
|
break LOOP
|
||||||
}
|
}
|
||||||
attempts--
|
attempts--
|
||||||
|
|
||||||
if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil {
|
treeCl, ok := p.clientMap[cnrNode.Hash()]
|
||||||
|
if !ok {
|
||||||
|
treeCl, err = p.getNewTreeClient(ctx, cnrNode)
|
||||||
|
if err != nil {
|
||||||
|
finErr = finalError(finErr, err)
|
||||||
|
p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p.clientMap[cnrNode.Hash()] = treeCl
|
||||||
|
}
|
||||||
|
|
||||||
|
if cl, err = treeCl.serviceClient(); err == nil {
|
||||||
err = fn(cl)
|
err = fn(cl)
|
||||||
}
|
}
|
||||||
if !shouldTryAgain(err) {
|
if !shouldTryAgain(err) {
|
||||||
if startI != indexI || startJ != indexJ {
|
|
||||||
p.setStartIndices(indexI, indexJ)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err)
|
err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
@ -858,14 +796,83 @@ LOOP:
|
||||||
|
|
||||||
finErr = finalError(finErr, err)
|
finErr = finalError(finErr, err)
|
||||||
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
||||||
zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err))
|
zap.String("address", treeCl.endpoint()), zap.Error(err))
|
||||||
}
|
}
|
||||||
startJ = 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return finErr
|
return finErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Pool) getContainerPlacementPolicy(ctx context.Context, cid cid.ID, reqID string) (netmap.PlacementPolicy, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
res *sdkClient.ResContainerGet
|
||||||
|
)
|
||||||
|
attempts := p.maxRequestAttempts
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for _, group := range p.innerPools {
|
||||||
|
for _, cl := range group.clients {
|
||||||
|
if attempts == 0 {
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
attempts--
|
||||||
|
|
||||||
|
prm := sdkClient.PrmContainerGet{
|
||||||
|
ContainerID: &cid,
|
||||||
|
}
|
||||||
|
res, err = cl.ContainerGet(ctx, prm)
|
||||||
|
var st apistatus.Status
|
||||||
|
if res != nil {
|
||||||
|
st = res.Status()
|
||||||
|
}
|
||||||
|
stErr := apistatus.ErrFromStatus(st)
|
||||||
|
if stErr != nil && err == nil {
|
||||||
|
err = stErr
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
p.log(zap.DebugLevel, "placement policy request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Container().PlacementPolicy(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return netmap.PlacementPolicy{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) {
|
||||||
|
var (
|
||||||
|
treeCl *treeClient
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
node.IterateNetworkEndpoints(func(endpoint string) bool {
|
||||||
|
var addr network.Address
|
||||||
|
if err = addr.FromString(endpoint); err != nil {
|
||||||
|
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
||||||
|
if err = newTreeCl.dial(ctx); err != nil {
|
||||||
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
treeCl = newTreeCl
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
if treeCl == nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return treeCl, nil
|
||||||
|
}
|
||||||
|
|
||||||
func shouldTryAgain(err error) bool {
|
func shouldTryAgain(err error) bool {
|
||||||
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,25 +7,30 @@ import (
|
||||||
|
|
||||||
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
type treeClientMock struct {
|
type treeClientMock struct {
|
||||||
address string
|
|
||||||
err bool
|
err bool
|
||||||
|
used bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
|
func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) {
|
||||||
if t.err {
|
if t.err {
|
||||||
return nil, errors.New("serviceClient() mock error")
|
return nil, errors.New("serviceClient() mock error")
|
||||||
}
|
}
|
||||||
|
t.used = true
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *treeClientMock) endpoint() string {
|
func (t *treeClientMock) endpoint() string {
|
||||||
return t.address
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *treeClientMock) isHealthy() bool {
|
func (t *treeClientMock) isHealthy() bool {
|
||||||
|
@ -51,6 +56,26 @@ func (t *treeClientMock) close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type placementPolicyMock struct {
|
||||||
|
policy netmap.PlacementPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *placementPolicyMock) PlacementPolicy(cid.ID) (netmap.PlacementPolicy, bool) {
|
||||||
|
return p.policy, true
|
||||||
|
}
|
||||||
|
|
||||||
|
type netMapMock struct {
|
||||||
|
netMap netmap.NetMap
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *netMapMock) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
|
||||||
|
if n.err != nil {
|
||||||
|
return netmap.NetMap{}, n.err
|
||||||
|
}
|
||||||
|
return n.netMap, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
defaultError := errors.New("default error")
|
defaultError := errors.New("default error")
|
||||||
for _, tc := range []struct {
|
for _, tc := range []struct {
|
||||||
|
@ -83,95 +108,74 @@ func TestHandleError(t *testing.T) {
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
nodes := [][]string{
|
nodesCount := 3
|
||||||
{"node00", "node01", "node02", "node03"},
|
policy := getPlacementPolicy(uint32(nodesCount))
|
||||||
{"node10", "node11", "node12", "node13"},
|
|
||||||
}
|
|
||||||
|
|
||||||
var lenNodes int
|
|
||||||
for i := range nodes {
|
|
||||||
lenNodes += len(nodes[i])
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &Pool{
|
p := &Pool{
|
||||||
logger: zaptest.NewLogger(t),
|
logger: zaptest.NewLogger(t),
|
||||||
innerPools: makeInnerPool(nodes),
|
maxRequestAttempts: nodesCount,
|
||||||
maxRequestAttempts: lenNodes,
|
policySource: &placementPolicyMock{policy: policy},
|
||||||
|
clientMap: make(map[uint64]client),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nm netmap.NetMap
|
||||||
|
nodeKeys := make([]*keys.PrivateKey, nodesCount)
|
||||||
|
for i := 0; i < nodesCount; i++ {
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
nodeKeys[i] = key
|
||||||
|
|
||||||
|
p.clientMap[hrw.Hash(key.Bytes())] = &treeClientMock{}
|
||||||
|
nm.SetNodes(append(nm.Nodes(), getNodeInfo(key.Bytes())))
|
||||||
|
}
|
||||||
|
p.netMapSource = &netMapMock{netMap: nm}
|
||||||
|
|
||||||
|
cnrID := cidtest.ID()
|
||||||
|
cnrNodes, err := nm.ContainerNodes(policy, cnrID[:])
|
||||||
|
require.NoError(t, err)
|
||||||
|
cnrNodes, err = nm.PlacementVectors(cnrNodes, cnrID[:])
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, cnrNodes, 1)
|
||||||
|
require.Len(t, cnrNodes[0], nodesCount)
|
||||||
|
nodeKeys = reorderKeys(nodeKeys, cnrNodes)
|
||||||
|
|
||||||
makeFn := func(client *rpcClient.Client) error {
|
makeFn := func(client *rpcClient.Client) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("first ok", func(t *testing.T) {
|
t.Run("first ok", func(t *testing.T) {
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err = p.requestWithRetry(ctx, cnrID, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkClientUsage(t, p, nodeKeys[0])
|
||||||
|
resetClients(p)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("first failed", func(t *testing.T) {
|
t.Run("first failed", func(t *testing.T) {
|
||||||
setErrors(p, "node00")
|
setErrors(p, nodeKeys[0])
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
err = p.requestWithRetry(ctx, cnrID, makeFn)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 1)
|
checkClientUsage(t, p, nodeKeys[1])
|
||||||
|
resetClients(p)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("first two failed", func(t *testing.T) {
|
||||||
|
setErrors(p, nodeKeys[0], nodeKeys[1])
|
||||||
|
err = p.requestWithRetry(ctx, cnrID, makeFn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkClientUsage(t, p, nodeKeys[2])
|
||||||
|
resetClients(p)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("all failed", func(t *testing.T) {
|
t.Run("all failed", func(t *testing.T) {
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodeKeys[0], nodeKeys[1], nodeKeys[2])
|
||||||
setErrors(p, nodes[1]...)
|
err = p.requestWithRetry(ctx, cnrID, makeFn)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkClientUsage(t, p)
|
||||||
})
|
resetClients(p)
|
||||||
|
|
||||||
t.Run("round", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0][0], nodes[0][1])
|
|
||||||
setErrors(p, nodes[1]...)
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndices(t, p, 0, 2)
|
|
||||||
resetClientsErrors(p)
|
|
||||||
|
|
||||||
setErrors(p, nodes[0][2], nodes[0][3])
|
|
||||||
err = p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("group switch", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0]...)
|
|
||||||
setErrors(p, nodes[1][0])
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 1, 1)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("group round", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0][1:]...)
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("group round switch", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0]...)
|
|
||||||
p.setStartIndices(0, 1)
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 1, 0)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("no panic group switch", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[1]...)
|
|
||||||
p.setStartIndices(1, 0)
|
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.NoError(t, err)
|
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error empty result", func(t *testing.T) {
|
t.Run("error empty result", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return errNodeEmptyResult
|
return errNodeEmptyResult
|
||||||
|
@ -179,12 +183,13 @@ func TestRetry(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, errNodes)
|
checkClientUsage(t, p, nodeKeys[:errNodes+1]...)
|
||||||
|
resetClients(p)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error not found", func(t *testing.T) {
|
t.Run("error not found", func(t *testing.T) {
|
||||||
errNodes, index := 2, 0
|
errNodes, index := 2, 0
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
|
||||||
if index < errNodes {
|
if index < errNodes {
|
||||||
index++
|
index++
|
||||||
return ErrNodeNotFound
|
return ErrNodeNotFound
|
||||||
|
@ -192,170 +197,89 @@ func TestRetry(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, errNodes)
|
checkClientUsage(t, p, nodeKeys[:errNodes+1]...)
|
||||||
|
resetClients(p)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("error access denied", func(t *testing.T) {
|
t.Run("error access denied", func(t *testing.T) {
|
||||||
var index int
|
var index int
|
||||||
err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error {
|
err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error {
|
||||||
index++
|
index++
|
||||||
return ErrNodeAccessDenied
|
return ErrNodeAccessDenied
|
||||||
})
|
})
|
||||||
require.ErrorIs(t, err, ErrNodeAccessDenied)
|
require.ErrorIs(t, err, ErrNodeAccessDenied)
|
||||||
require.Equal(t, 1, index)
|
require.Equal(t, 1, index)
|
||||||
checkIndicesAndReset(t, p, 0, 0)
|
checkClientUsage(t, p, nodeKeys[0])
|
||||||
|
resetClients(p)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("limit attempts", func(t *testing.T) {
|
t.Run("limit attempts", func(t *testing.T) {
|
||||||
oldVal := p.maxRequestAttempts
|
|
||||||
p.maxRequestAttempts = 2
|
p.maxRequestAttempts = 2
|
||||||
setErrors(p, nodes[0]...)
|
setErrors(p, nodeKeys[0], nodeKeys[1])
|
||||||
setErrors(p, nodes[1]...)
|
err = p.requestWithRetry(ctx, cnrID, makeFn)
|
||||||
err := p.requestWithRetry(ctx, makeFn)
|
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
checkIndicesAndReset(t, p, 0, 2)
|
checkClientUsage(t, p)
|
||||||
p.maxRequestAttempts = oldVal
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRebalance(t *testing.T) {
|
|
||||||
nodes := [][]string{
|
|
||||||
{"node00", "node01"},
|
|
||||||
{"node10", "node11"},
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &Pool{
|
|
||||||
logger: zaptest.NewLogger(t),
|
|
||||||
innerPools: makeInnerPool(nodes),
|
|
||||||
rebalanceParams: rebalanceParameters{
|
|
||||||
nodesGroup: makeNodesGroup(nodes),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
buffers := makeBuffer(p)
|
|
||||||
|
|
||||||
t.Run("check dirty buffers", func(t *testing.T) {
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 0, 0)
|
|
||||||
setErrors(p, nodes[0][0])
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 0, 1)
|
|
||||||
resetClients(p)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("don't change healthy status", func(t *testing.T) {
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 0, 0)
|
|
||||||
resetClients(p)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("switch to second group", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0][0], nodes[0][1])
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 1, 0)
|
|
||||||
resetClients(p)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("switch back and forth", func(t *testing.T) {
|
|
||||||
setErrors(p, nodes[0][0], nodes[0][1])
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 1, 0)
|
|
||||||
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 1, 0)
|
|
||||||
|
|
||||||
setNoErrors(p, nodes[0][0])
|
|
||||||
p.updateNodesHealth(ctx, buffers)
|
|
||||||
checkIndices(t, p, 0, 0)
|
|
||||||
|
|
||||||
resetClients(p)
|
resetClients(p)
|
||||||
|
p.maxRequestAttempts = nodesCount
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeInnerPool(nodes [][]string) []*innerPool {
|
func reorderKeys(nodeKeys []*keys.PrivateKey, cnrNodes [][]netmap.NodeInfo) []*keys.PrivateKey {
|
||||||
res := make([]*innerPool, len(nodes))
|
res := make([]*keys.PrivateKey, len(nodeKeys))
|
||||||
|
for i := 0; i < len(cnrNodes[0]); i++ {
|
||||||
for i, group := range nodes {
|
for j := 0; j < len(nodeKeys); j++ {
|
||||||
res[i] = &innerPool{clients: make([]client, len(group))}
|
if hrw.Hash(nodeKeys[j].Bytes()) == cnrNodes[0][i].Hash() {
|
||||||
for j, node := range group {
|
res[i] = nodeKeys[j]
|
||||||
res[i].clients[j] = &treeClientMock{address: node}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeNodesGroup(nodes [][]string) [][]pool.NodeParam {
|
func checkClientUsage(t *testing.T, p *Pool, nodeKeys ...*keys.PrivateKey) {
|
||||||
res := make([][]pool.NodeParam, len(nodes))
|
for hash, cl := range p.clientMap {
|
||||||
|
if containsHash(nodeKeys, hash) {
|
||||||
for i, group := range nodes {
|
require.True(t, cl.(*treeClientMock).used)
|
||||||
res[i] = make([]pool.NodeParam, len(group))
|
} else {
|
||||||
for j, node := range group {
|
require.False(t, cl.(*treeClientMock).used)
|
||||||
res[i][j] = pool.NewNodeParam(1, node, 1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeBuffer(p *Pool) [][]bool {
|
|
||||||
buffers := make([][]bool, len(p.rebalanceParams.nodesGroup))
|
|
||||||
for i, nodes := range p.rebalanceParams.nodesGroup {
|
|
||||||
buffers[i] = make([]bool, len(nodes))
|
|
||||||
}
|
|
||||||
return buffers
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkIndicesAndReset(t *testing.T, p *Pool, iExp, jExp int) {
|
|
||||||
checkIndices(t, p, iExp, jExp)
|
|
||||||
resetClients(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkIndices(t *testing.T, p *Pool, iExp, jExp int) {
|
|
||||||
i, j := p.getStartIndices()
|
|
||||||
require.Equal(t, [2]int{iExp, jExp}, [2]int{i, j})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func resetClients(p *Pool) {
|
func resetClients(p *Pool) {
|
||||||
resetClientsErrors(p)
|
for _, cl := range p.clientMap {
|
||||||
p.setStartIndices(0, 0)
|
cl.(*treeClientMock).used = false
|
||||||
|
cl.(*treeClientMock).err = false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func resetClientsErrors(p *Pool) {
|
func setErrors(p *Pool, nodeKeys ...*keys.PrivateKey) {
|
||||||
for _, group := range p.innerPools {
|
for hash, cl := range p.clientMap {
|
||||||
for _, cl := range group.clients {
|
if containsHash(nodeKeys, hash) {
|
||||||
node := cl.(*treeClientMock)
|
cl.(*treeClientMock).err = true
|
||||||
node.err = false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setErrors(p *Pool, nodes ...string) {
|
func containsHash(list []*keys.PrivateKey, hash uint64) bool {
|
||||||
setErrorsBase(p, true, nodes...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setNoErrors(p *Pool, nodes ...string) {
|
|
||||||
setErrorsBase(p, false, nodes...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func setErrorsBase(p *Pool, err bool, nodes ...string) {
|
|
||||||
for _, group := range p.innerPools {
|
|
||||||
for _, cl := range group.clients {
|
|
||||||
node := cl.(*treeClientMock)
|
|
||||||
if containsStr(nodes, node.address) {
|
|
||||||
node.err = err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func containsStr(list []string, item string) bool {
|
|
||||||
for i := range list {
|
for i := range list {
|
||||||
if list[i] == item {
|
if hrw.Hash(list[i].Bytes()) == hash {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getPlacementPolicy(replicas uint32) (p netmap.PlacementPolicy) {
|
||||||
|
var r netmap.ReplicaDescriptor
|
||||||
|
r.SetNumberOfObjects(replicas)
|
||||||
|
p.AddReplicas([]netmap.ReplicaDescriptor{r}...)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNodeInfo(key []byte) netmap.NodeInfo {
|
||||||
|
var node netmap.NodeInfo
|
||||||
|
node.SetPublicKey(key)
|
||||||
|
return node
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue