Compare commits

...

1 commit

Author SHA1 Message Date
b464c4c3cf Use netmap in tree pool requests
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-09-04 15:41:46 +03:00
5 changed files with 299 additions and 27 deletions

10
go.mod
View file

@ -12,6 +12,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/reedsolomon v1.12.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/nspcc-dev/neo-go v0.106.2
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
@ -28,13 +29,22 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/ipfs/go-cid v0.0.7 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/multiformats/go-base32 v0.0.3 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multihash v0.0.14 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
go.etcd.io/bbolt v1.3.9 // indirect

26
go.sum
View file

@ -57,6 +57,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyf
github.com/holiman/uint256 v1.2.4 h1:jUc4Nk8fm9jZabQuqr2JzednajVmBpC+oiTiXZJEApU=
github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY=
github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
@ -69,10 +71,31 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8=
github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY=
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4=
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multiaddr v0.12.1 h1:vm+BA/WZA8QZDp1pF1FWhi5CT3g1tbi5GJmqpb6wnlk=
github.com/multiformats/go-multiaddr v0.12.1/go.mod h1:7mPkiBMmLeFipt+nNSq9pHZUeJSt8lHBgH6yhj0YQzE=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I=
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 h1:mD9hU3v+zJcnHAVmHnZKt3I++tvn30gBj2rP2PocZMk=
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2/go.mod h1:U5VfmPNM88P4RORFb6KSUVBdJBDhlqggJZYGXGPxOcc=
github.com/nspcc-dev/neo-go v0.106.2 h1:KXSJ2J5Oacc7LrX3r4jvnC8ihKqHs5NB21q4f2S3r9o=
@ -102,6 +125,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
@ -121,6 +146,7 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=

View file

@ -0,0 +1,110 @@
package network
import (
"errors"
"fmt"
"net"
"net/url"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var errHostIsEmpty = errors.New("host is empty")
// Address represents the FrostFS node
// network address.
type Address struct {
ma multiaddr.Multiaddr
}
// URIAddr returns Address as a URI.
//
// Panics if host address cannot be fetched from Address.
//
// See also FromString.
func (a Address) URIAddr() string {
_, host, err := manet.DialArgs(a.ma)
if err != nil {
// the only correct way to construct Address is AddressFromString
// which makes this error appear unexpected
panic(fmt.Errorf("could not get host addr: %w", err))
}
if !a.IsTLSEnabled() {
return host
}
return (&url.URL{
Scheme: "grpcs",
Host: host,
}).String()
}
// FromString restores Address from a string representation.
//
// Supports URIAddr, MultiAddr and HostAddr strings.
func (a *Address) FromString(s string) error {
var err error
a.ma, err = multiaddr.NewMultiaddr(s)
if err != nil {
var (
host string
hasTLS bool
)
host, hasTLS, err = client.ParseURI(s)
if err != nil {
host = s
}
s, err = multiaddrStringFromHostAddr(host)
if err == nil {
a.ma, err = multiaddr.NewMultiaddr(s)
if err == nil && hasTLS {
a.ma = a.ma.Encapsulate(tls)
}
}
}
return err
}
// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080".
func multiaddrStringFromHostAddr(host string) (string, error) {
if len(host) == 0 {
return "", errHostIsEmpty
}
endpoint, port, err := net.SplitHostPort(host)
if err != nil {
return "", err
}
// Empty address in host `:8080` generates `/dns4//tcp/8080` multiaddr
// which is invalid. It could be `/tcp/8080` but this breaks
// `manet.DialArgs`. The solution is to manually parse it as 0.0.0.0
if endpoint == "" {
return "/ip4/0.0.0.0/tcp/" + port, nil
}
var (
prefix = "/dns4"
addr = endpoint
)
if ip := net.ParseIP(endpoint); ip != nil {
addr = ip.String()
if ip.To4() == nil {
prefix = "/ip6"
} else {
prefix = "/ip4"
}
}
const l4Protocol = "tcp"
return strings.Join([]string{prefix, addr, l4Protocol, port}, "/"), nil
}

16
pool/tree/network/tls.go Normal file
View file

@ -0,0 +1,16 @@
package network
import "github.com/multiformats/go-multiaddr"
const (
tlsProtocolName = "tls"
)
// tls var is used for (un)wrapping other multiaddrs around TLS multiaddr.
var tls, _ = multiaddr.NewMultiaddr("/" + tlsProtocolName)
// IsTLSEnabled searches for wrapped TLS protocol in multiaddr.
func (a Address) IsTLSEnabled() bool {
_, err := a.ma.ValueForProtocol(multiaddr.P_TLS)
return err == nil
}

View file

@ -10,8 +10,11 @@ import (
"sync"
"time"
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/network"
grpcService "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/zap"
@ -71,6 +74,8 @@ type InitParameters struct {
nodeParams []pool.NodeParam
dialOptions []grpc.DialOption
maxRequestAttempts int
bootstrapAddr string
}
// Pool represents virtual connection to the FrostFS tree services network to communicate
@ -99,6 +104,10 @@ type Pool struct {
// * rebalance procedure (see Pool.startRebalance)
// * retry in case of request failure (see Pool.requestWithRetry)
startIndices [2]int
bootstrapAddr string
clientMap map[uint64]client
netMap netmap.NetMap
}
type innerPool struct {
@ -121,6 +130,7 @@ type GetNodesParams struct {
LatestOnly bool
AllAttrs bool
BearerToken []byte
Policy netmap.PlacementPolicy
}
// GetSubTreeParams groups parameters of Pool.GetSubTree operation.
@ -131,6 +141,7 @@ type GetSubTreeParams struct {
Depth uint32
BearerToken []byte
Order SubTreeSort
Policy netmap.PlacementPolicy
}
// AddNodeParams groups parameters of Pool.AddNode operation.
@ -140,6 +151,7 @@ type AddNodeParams struct {
Parent uint64
Meta map[string]string
BearerToken []byte
Policy netmap.PlacementPolicy
}
// AddNodeByPathParams groups parameters of Pool.AddNodeByPath operation.
@ -150,6 +162,7 @@ type AddNodeByPathParams struct {
Meta map[string]string
PathAttribute string
BearerToken []byte
Policy netmap.PlacementPolicy
}
// MoveNodeParams groups parameters of Pool.MoveNode operation.
@ -160,6 +173,7 @@ type MoveNodeParams struct {
ParentID uint64
Meta map[string]string
BearerToken []byte
Policy netmap.PlacementPolicy
}
// RemoveNodeParams groups parameters of Pool.RemoveNode operation.
@ -168,6 +182,7 @@ type RemoveNodeParams struct {
TreeID string
NodeID uint64
BearerToken []byte
Policy netmap.PlacementPolicy
}
// MethodIndex index of method in list of statuses in Pool.
@ -208,6 +223,9 @@ func NewPool(options InitParameters) (*Pool, error) {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
if options.bootstrapAddr == "" {
return nil, fmt.Errorf("missed bootstrap address")
}
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
@ -232,6 +250,7 @@ func NewPool(options InitParameters) (*Pool, error) {
},
maxRequestAttempts: options.maxRequestAttempts,
methods: methods,
bootstrapAddr: options.bootstrapAddr,
}
return p, nil
@ -246,24 +265,37 @@ func NewPool(options InitParameters) (*Pool, error) {
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup))
var atLeastOneHealthy bool
cl, err := p.getNetMapClient(ctx)
if err != nil {
return fmt.Errorf("get netmap client: %w", err)
}
for i, nodes := range p.rebalanceParams.nodesGroup {
clients := make([]client, len(nodes))
for j, node := range nodes {
clients[j] = newTreeClient(node.Address(), p.dialOptions...)
if err := clients[j].dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", node.Address()), zap.Error(err))
continue
res, err := cl.NetMapSnapshot(ctx, clientSDK.PrmNetMapSnapshot{})
if err != nil {
return fmt.Errorf("get netmap: %w", err)
}
p.netMap = res.NetMap()
p.clientMap = make(map[uint64]client, len(p.netMap.Nodes()))
var atLeastOneHealthy bool
for _, node := range p.netMap.Nodes() {
node.IterateNetworkEndpoints(func(endpoint string) bool {
var addr network.Address
if err := addr.FromString(endpoint); err != nil {
p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err))
return false
}
treeClient := newTreeClient(addr.URIAddr(), p.dialOptions...)
if err := treeClient.dial(ctx); err != nil {
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
return false
}
atLeastOneHealthy = true
}
inner[i] = &innerPool{
clients: clients,
}
p.clientMap[node.Hash()] = treeClient
return true
})
}
if !atLeastOneHealthy {
@ -273,12 +305,31 @@ func (p *Pool) Dial(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
p.cancel = cancel
p.closedCh = make(chan struct{})
p.innerPools = inner
go p.startRebalance(ctx)
return nil
}
func (p *Pool) getNetMapClient(ctx context.Context) (*clientSDK.Client, error) {
var c clientSDK.Client
prmInit := clientSDK.PrmInit{
Key: p.key.PrivateKey,
}
prmDial := clientSDK.PrmDial{
Endpoint: p.bootstrapAddr,
}
c.Init(prmInit)
if err := c.Dial(ctx, prmDial); err != nil {
return nil, fmt.Errorf("dial client: %w", err)
}
return &c, nil
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *keys.PrivateKey) {
x.key = key
@ -329,6 +380,10 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) {
x.maxRequestAttempts = maxAttempts
}
func (x *InitParameters) SetBootstrapAddress(addr string) {
x.bootstrapAddr = addr
}
// GetNodes invokes eponymous method from TreeServiceClient.
//
// Can return predefined errors:
@ -354,7 +409,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*grpcService
}
var resp *grpcService.GetNodeByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.GetNodeByPath(ctx, request)
// Pool wants to do retry 'GetNodeByPath' request if result is empty.
// Empty result is expected due to delayed tree service sync.
@ -454,7 +509,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe
}
var cli grpcService.TreeService_GetSubTreeClient
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) (inErr error) {
cli, inErr = client.GetSubTree(ctx, request)
return handleError("failed to get sub tree client", inErr)
})
@ -488,7 +543,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
}
var resp *grpcService.AddResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.Add(ctx, request)
return handleError("failed to add node", inErr)
})
@ -523,7 +578,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
}
var resp *grpcService.AddByPathResponse
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) (inErr error) {
resp, inErr = client.AddByPath(ctx, request)
return handleError("failed to add node by path", inErr)
})
@ -565,7 +620,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) error {
if _, err := client.Move(ctx, request); err != nil {
return handleError("failed to move node", err)
}
@ -596,7 +651,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
return err
}
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
err := p.newRequestWithRetry(ctx, prm.CID[:], prm.Policy, func(client grpcService.TreeServiceClient) error {
if _, err := client.Remove(ctx, request); err != nil {
return handleError("failed to remove node", err)
}
@ -613,12 +668,10 @@ func (p *Pool) Close() error {
<-p.closedCh
var err error
for _, group := range p.innerPools {
for _, cl := range group.clients {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr))
err = closeErr
}
for _, cl := range p.clientMap {
if closeErr := cl.close(); closeErr != nil {
p.log(zapcore.ErrorLevel, "close client connection", zap.String("endpoint", cl.endpoint()), zap.Error(closeErr))
err = closeErr
}
}
@ -854,6 +907,63 @@ LOOP:
return finErr
}
func (p *Pool) newRequestWithRetry(ctx context.Context, cid []byte, policy netmap.PlacementPolicy, fn func(client grpcService.TreeServiceClient) error) error {
var (
err, finErr error
cl grpcService.TreeServiceClient
)
reqID := GetRequestID(ctx)
cntNodes, err := p.netMap.ContainerNodes(policy, cid)
if err != nil {
return fmt.Errorf("get container nodes: %w", err)
}
cntNodes, err = p.netMap.PlacementVectors(cntNodes, cid)
if err != nil {
return fmt.Errorf("get placement vectors: %w", err)
}
attempts := p.maxRequestAttempts
LOOP:
for i := 0; i < len(cntNodes); i++ {
clientsLen := len(cntNodes[i])
for j := 0; j < clientsLen; j++ {
if attempts == 0 {
break LOOP
}
attempts--
client, ok := p.clientMap[cntNodes[i][j].Hash()]
if !ok {
finErr = finalError(finErr, errors.New("missed node client"))
p.log(zap.DebugLevel, "missed node client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.Uint64("node hash", cntNodes[i][j].Hash()))
continue
}
if cl, err = client.serviceClient(); err == nil {
err = fn(cl)
}
if !shouldTryAgain(err) {
if err != nil {
err = fmt.Errorf("address %s: %w", client.endpoint(), err)
}
return err
}
finErr = finalError(finErr, err)
p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts),
zap.String("address", client.endpoint()), zap.Error(err))
}
}
return finErr
}
func shouldTryAgain(err error) bool {
return !(err == nil || errors.Is(err, ErrNodeAccessDenied))
}