From 42a0fc8c13aefb79bcc273be2fb2a8ae9eaf17b1 Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Thu, 12 Dec 2024 12:00:11 +0300 Subject: [PATCH] [#305] tree/pool: Add flag to use net map to prioritize tree services New version of tree/pool selects tree service connection to make request based on the current net map and container placement policy Signed-off-by: Marina Biryukova --- go.mod | 12 +- go.sum | Bin 17793 -> 20032 bytes pkg/network/address.go | 112 ++++++++++++++++++ pkg/network/address_test.go | 83 +++++++++++++ pkg/network/tls.go | 18 +++ pkg/network/tls_test.go | 46 ++++++++ pool/tree/pool.go | 227 ++++++++++++++++++++++++++++++++---- pool/tree/pool_test.go | 225 ++++++++++++++++++++++++++++++++--- 8 files changed, 687 insertions(+), 36 deletions(-) create mode 100644 pkg/network/address.go create mode 100644 pkg/network/address_test.go create mode 100644 pkg/network/tls.go create mode 100644 pkg/network/tls_test.go diff --git a/go.mod b/go.mod index ed14604..9e459f2 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/klauspost/reedsolomon v1.12.1 github.com/mailru/easyjson v0.7.7 github.com/mr-tron/base58 v1.2.0 + github.com/multiformats/go-multiaddr v0.14.0 github.com/nspcc-dev/neo-go v0.106.2 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 @@ -29,13 +30,20 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect + github.com/ipfs/go-cid v0.0.7 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect - github.com/kr/pretty v0.1.0 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect github.com/nspcc-dev/rfc6979 v0.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect github.com/twmb/murmur3 v1.1.8 // indirect go.etcd.io/bbolt v1.3.9 // indirect @@ -46,5 +54,5 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/go.sum b/go.sum index 56930c779a92b45099c8646bf7fe31a3ba0ab9b9..9cf5673184db29f4484baffa914e4e5d465e7098 100644 GIT binary patch delta 2790 zcmb7G%dg{h8C6@Ui!NAo+RhacBUCCRbFbg8L6zz_j$=FVGj`%Eko|w$+&iz_fUlyRvF|pee-BES2YSE*U|MCG?`3wmg4K_CkT`6mrW$fi&~anDiMA07 z3a8JXeEQcPJ$d)^%cmcm{`<+Fj!BV}pqo2cfVj6!viX0t&@BSN1VkbzM8FVC-=ZNN zk_hIxxC>u#S7S=w34N}iIJJ=M+d=) zZ+PK5_!hc5Gt&?aqc9F55OquV0fvz{3X$YB^9T>%5`aM|k~hoR&?Jbmi8XAwc(oU@ zw)1mH#Hpl*r1^I;KM)@;is5sA;ER=+;}p0s+3Xi`Ulfv5ji&)ISx(DXRHiYjdhsA( zv_H&W2l_4Ml~d|KFw#6R{UTy#Rj`fG*93qOcy>*|I~o&Nds=clip z{*68tb_H?p#)!pl^TZ%0HeII)P~;(==oL>jOFEvVdPvhzGr-negqwpz&(g@v zW|VAbDKu!fJb(=I*xK*KKkIlFAK~Yfb)XNWp>8=r8+)y3q?nX3H=vb>B1?NDryo<+3<&dyKmE zP4F)g?$iHj_Hh8#u|TUAsG8Yp1Y^-9Vg^X7?gii0#yCW;Q^Uo3TJhej<7EoY?n3BGM_JLWv$yH=g`%fTTGH&;fd4Dv?zRvjD8UKl}GMmv|eGk#4;d~_sYaz zN@-S(51vUt?A7UBiT&- z*0c5p#pSe7tA|~mUYm5>fG=;L%m3FT$Q^n=2;{8ys8zTE1sh41E*J@KI>7@y@A}qJ3}(zv==yYbaLWnmS2k0vM61Td(w^dYI~Jw=!x=((UuBn)>s#{rW|B zO>ns&nx(<(KnMmM&WzR!vaW1tB+OQeMMI0r%ID@_!STv;Vo)!)SADenA0c0rUQ?=R z@4;{k8tI5lB&8hZ{Djb)y~53^HPg{SQW|9gD&`6P@y9RzbMxf~$I2;l$MceNgX{B$ zKHHt2@`uepdT>Nx1~yXlI-KFF+OLRE5fO-JOu9#CbUdCqWe>Tt#~Zo%9?Yv!#l7G( z%Ca|qN`wHgjuWm6De@pU!^U8cO+TI^($LYa&kt{O)Jf`^x#aAYd4#Fbsok zaSwxVf^-lhq^^+rB$}A?ZaW?9grm&OP6|9H~6c&J7?SQ7733mu-MRtT!7}1xx+LT$5O2rvGLO7 zi?flhhr3b=DHFXS!%s&o)91X!Di~B!WaD;K)(d>(Ry!D&!x=oA6WIyoJ~_S19sdW( C6`V%^ delta 684 zcmZ{hyKd7^0EShNI%HyK8zGjeuto7X$9CKVse_vfwiVlTuAFSX9A9d?b{r>hB1FW@ zh9$bNu_I7k02X)(9)X1cCL|gGqDs{l`2O$z|Br8WUcB6S|8p<@wqJbvVgJs`Pe7cI zG+i9dAcZJ+=%UAMP@gMsNs>_`?6mrN+&%MBpR-FEQ-Vt!C7Qn+Ol41Qi6;feOWmnK znNbL~S-^>?FtRpCi(fE-bsW@e=ot+~)l@-{F&0;%;o64K4VgMSnnaS?QKB2njU2j? zL)UzKEq0IK3C;&gkaCdF0vPliM@yp7^6PXws|SZ(!XuN>c1i@8M9JD z8Z!MJXnX2hB3L7c*yFUH0dL^yAz){=#V5***8hm~pZzLI2C)uDV@xlc0mE_HGMy%Z zh!|;HtMIWo(4|v1?v{9btK1dpT3ga4$uUj$i0?g_%D!jGSslbOWP@RoB)ZV=ho|#u zKs!k@|51FHe<{Asf8GC_Paa(6&mR@?uSfLy@~B-{zba?D>x_N2mt&D%nJzJXYKAuR zLJF2?h-$C~X;_67NS9e;L)KypLr0a(iMUfWo85Vr?OBcUCgCdC#DS61k%3||4XnCN lJ{Xp=$QW%tjAGxRybEx9$i3~9^;7ZP-Tb?Jkbja_zX65*;JyF= diff --git a/pkg/network/address.go b/pkg/network/address.go new file mode 100644 index 0000000..45d5939 --- /dev/null +++ b/pkg/network/address.go @@ -0,0 +1,112 @@ +// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/address.go + +package network + +import ( + "errors" + "fmt" + "net" + "net/url" + "strings" + + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var errHostIsEmpty = errors.New("host is empty") + +// Address represents the FrostFS node +// network address. +type Address struct { + ma multiaddr.Multiaddr +} + +// URIAddr returns Address as a URI. +// +// Panics if host address cannot be fetched from Address. +// +// See also FromString. +func (a Address) URIAddr() string { + _, host, err := manet.DialArgs(a.ma) + if err != nil { + // the only correct way to construct Address is AddressFromString + // which makes this error appear unexpected + panic(fmt.Errorf("could not get host addr: %w", err)) + } + + if !a.IsTLSEnabled() { + return host + } + + return (&url.URL{ + Scheme: "grpcs", + Host: host, + }).String() +} + +// FromString restores Address from a string representation. +// +// Supports URIAddr, MultiAddr and HostAddr strings. +func (a *Address) FromString(s string) error { + var err error + + a.ma, err = multiaddr.NewMultiaddr(s) + if err != nil { + var ( + host string + hasTLS bool + ) + host, hasTLS, err = client.ParseURI(s) + if err != nil { + host = s + } + + s, err = multiaddrStringFromHostAddr(host) + if err == nil { + a.ma, err = multiaddr.NewMultiaddr(s) + if err == nil && hasTLS { + a.ma = a.ma.Encapsulate(tls) + } + } + } + + return err +} + +// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080". +func multiaddrStringFromHostAddr(host string) (string, error) { + if len(host) == 0 { + return "", errHostIsEmpty + } + + endpoint, port, err := net.SplitHostPort(host) + if err != nil { + return "", err + } + + // Empty address in host `:8080` generates `/dns4//tcp/8080` multiaddr + // which is invalid. It could be `/tcp/8080` but this breaks + // `manet.DialArgs`. The solution is to manually parse it as 0.0.0.0 + if endpoint == "" { + return "/ip4/0.0.0.0/tcp/" + port, nil + } + + var ( + prefix = "/dns4" + addr = endpoint + ) + + if ip := net.ParseIP(endpoint); ip != nil { + addr = ip.String() + if ip.To4() == nil { + prefix = "/ip6" + } else { + prefix = "/ip4" + } + } + + const l4Protocol = "tcp" + + return strings.Join([]string{prefix, addr, l4Protocol, port}, "/"), nil +} diff --git a/pkg/network/address_test.go b/pkg/network/address_test.go new file mode 100644 index 0000000..c6558eb --- /dev/null +++ b/pkg/network/address_test.go @@ -0,0 +1,83 @@ +// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/address_test.go + +package network + +import ( + "testing" + + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestAddressFromString(t *testing.T) { + t.Run("valid addresses", func(t *testing.T) { + testcases := []struct { + inp string + exp multiaddr.Multiaddr + }{ + {":8080", buildMultiaddr("/ip4/0.0.0.0/tcp/8080", t)}, + {"example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070", t)}, + {"213.44.87.1:32512", buildMultiaddr("/ip4/213.44.87.1/tcp/32512", t)}, + {"[2004:eb1::1]:8080", buildMultiaddr("/ip6/2004:eb1::1/tcp/8080", t)}, + {"grpc://example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070", t)}, + {"grpcs://example.com:7070", buildMultiaddr("/dns4/example.com/tcp/7070/"+tlsProtocolName, t)}, + } + + var addr Address + + for _, testcase := range testcases { + err := addr.FromString(testcase.inp) + require.NoError(t, err) + require.Equal(t, testcase.exp, addr.ma, testcase.inp) + } + }) + t.Run("invalid addresses", func(t *testing.T) { + testCases := []string{ + "wtf://example.com:123", // wrong scheme + "grpc://example.com", // missing port + } + + var addr Address + for _, tc := range testCases { + require.Error(t, addr.FromString(tc)) + } + }) +} + +func TestAddress_HostAddrString(t *testing.T) { + t.Run("valid addresses", func(t *testing.T) { + testcases := []struct { + ma multiaddr.Multiaddr + exp string + }{ + {buildMultiaddr("/dns4/frostfs.bigcorp.com/tcp/8080", t), "frostfs.bigcorp.com:8080"}, + {buildMultiaddr("/ip4/172.16.14.1/tcp/8080", t), "172.16.14.1:8080"}, + {buildMultiaddr("/ip4/192.168.0.1/tcp/8888/tls", t), "grpcs://192.168.0.1:8888"}, + } + + for _, testcase := range testcases { + addr := Address{testcase.ma} + + got := addr.URIAddr() + + require.Equal(t, testcase.exp, got) + } + }) + + t.Run("invalid addresses", func(t *testing.T) { + testcases := []multiaddr.Multiaddr{ + buildMultiaddr("/tcp/8080", t), + } + + for _, testcase := range testcases { + addr := Address{testcase} + require.Panics(t, func() { addr.URIAddr() }) + } + }) +} + +func buildMultiaddr(s string, t *testing.T) multiaddr.Multiaddr { + ma, err := multiaddr.NewMultiaddr(s) + require.NoError(t, err) + return ma +} diff --git a/pkg/network/tls.go b/pkg/network/tls.go new file mode 100644 index 0000000..b3e9663 --- /dev/null +++ b/pkg/network/tls.go @@ -0,0 +1,18 @@ +// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/tls.go + +package network + +import "github.com/multiformats/go-multiaddr" + +const ( + tlsProtocolName = "tls" +) + +// tls var is used for (un)wrapping other multiaddrs around TLS multiaddr. +var tls, _ = multiaddr.NewMultiaddr("/" + tlsProtocolName) + +// IsTLSEnabled searches for wrapped TLS protocol in multiaddr. +func (a Address) IsTLSEnabled() bool { + _, err := a.ma.ValueForProtocol(multiaddr.P_TLS) + return err == nil +} diff --git a/pkg/network/tls_test.go b/pkg/network/tls_test.go new file mode 100644 index 0000000..bfa18b8 --- /dev/null +++ b/pkg/network/tls_test.go @@ -0,0 +1,46 @@ +// NOTE: code is taken from https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/df05057ed46632e7746fcaa26731987a9070b2e5/pkg/network/tls_test.go + +package network + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestAddress_TLSEnabled(t *testing.T) { + testCases := [...]struct { + input string + wantTLS bool + }{ + {"/dns4/localhost/tcp/8080", false}, + {"/dns4/localhost/tcp/8080/tls", true}, + {"/tls/dns4/localhost/tcp/8080", true}, + {"grpc://localhost:8080", false}, + {"grpcs://localhost:8080", true}, + } + + var addr Address + + for _, test := range testCases { + err := addr.FromString(test.input) + require.NoError(t, err) + + require.Equal(t, test.wantTLS, addr.IsTLSEnabled(), test.input) + } +} + +func BenchmarkAddressTLSEnabled(b *testing.B) { + var addr Address + err := addr.FromString("/dns4/localhost/tcp/8080/tls") + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + var enabled bool + for range b.N { + enabled = addr.IsTLSEnabled() + } + require.True(b, enabled) +} diff --git a/pool/tree/pool.go b/pool/tree/pool.go index f4f9bf8..ddfdc0e 100644 --- a/pool/tree/pool.go +++ b/pool/tree/pool.go @@ -14,6 +14,8 @@ import ( rpcclient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/zap" @@ -73,6 +75,12 @@ type InitParameters struct { nodeParams []pool.NodeParam dialOptions []grpc.DialOption maxRequestAttempts int + netMapInfoSource NetMapInfoSource +} + +type NetMapInfoSource interface { + NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) + PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error) } // Pool represents virtual connection to the FrostFS tree services network to communicate @@ -80,7 +88,7 @@ type InitParameters struct { // due to their unavailability. // // Pool can be created and initialized using NewPool function. -// Before executing the FrostFS tree operations using the Pool, connection to the +// Before executing the FrostFS tree operations using the Pool without netMapInfoSource, connection to the // servers MUST BE correctly established (see Dial method). type Pool struct { innerPools []*innerPool @@ -96,12 +104,18 @@ type Pool struct { streamTimeout time.Duration nodeDialTimeout time.Duration - startIndicesMtx sync.RWMutex + netMapInfoSource NetMapInfoSource + + // mutex protects clientMap and startIndices + mutex sync.RWMutex + // clientMap will be used if netMapInfoSource is set + clientMap map[uint64]client // startIndices points to the client from which the next request will be executed. // Since clients are stored in innerPool field we have to use two indices. // These indices being changed during: // * rebalance procedure (see Pool.startRebalance) // * retry in case of request failure (see Pool.requestWithRetry) + // startIndices will be used if netMapInfoSource is not set startIndices [2]int } @@ -213,11 +227,6 @@ func NewPool(options InitParameters) (*Pool, error) { return nil, fmt.Errorf("missed required parameter 'Key'") } - nodesParams, err := adjustNodeParams(options.nodeParams) - if err != nil { - return nil, err - } - fillDefaultInitParams(&options) methods := make([]*pool.MethodStatus, methodLast) @@ -230,27 +239,44 @@ func NewPool(options InitParameters) (*Pool, error) { logger: options.logger, dialOptions: options.dialOptions, rebalanceParams: rebalanceParameters{ - nodesGroup: nodesParams, nodeRequestTimeout: options.healthcheckTimeout, clientRebalanceInterval: options.clientRebalanceInterval, }, maxRequestAttempts: options.maxRequestAttempts, streamTimeout: options.nodeStreamTimeout, + nodeDialTimeout: options.nodeDialTimeout, methods: methods, + netMapInfoSource: options.netMapInfoSource, + clientMap: make(map[uint64]client), + } + + if options.netMapInfoSource == nil { + nodesParams, err := adjustNodeParams(options.nodeParams) + if err != nil { + return nil, err + } + p.rebalanceParams.nodesGroup = nodesParams } return p, nil } -// Dial establishes a connection to the tree servers from the FrostFS network. +// Dial may not be called and will have no effect if netMapInfoSource is set. +// See also InitParameters.SetNetMapInfoSource +// +// Otherwise, Dial establishes a connection to the tree servers from the FrostFS network. // It also starts a routine that checks the health of the nodes and // updates the weights of the nodes for balancing. // Returns an error describing failure reason. // -// If failed, the Pool SHOULD NOT be used. +// If failed and netMapInfoSource is not set, the Pool SHOULD NOT be used. // // See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { + if p.netMapInfoSource != nil { + return nil + } + inner := make([]*innerPool, len(p.rebalanceParams.nodesGroup)) var atLeastOneHealthy bool @@ -334,6 +360,12 @@ func (x *InitParameters) SetMaxRequestAttempts(maxAttempts int) { x.maxRequestAttempts = maxAttempts } +// SetNetMapInfoSource sets implementation of interface to get current net map and container placement policy. +// If set, AddNode will have no effect. +func (x *InitParameters) SetNetMapInfoSource(netMapInfoSource NetMapInfoSource) { + x.netMapInfoSource = netMapInfoSource +} + // GetNodes invokes eponymous method from TreeServiceClient. // // Can return predefined errors: @@ -359,7 +391,7 @@ func (p *Pool) GetNodes(ctx context.Context, prm GetNodesParams) ([]*tree.GetNod } var resp *tree.GetNodeByPathResponse - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.GetNodeByPath(client, request, rpcclient.WithContext(ctx)) // Pool wants to do retry 'GetNodeByPath' request if result is empty. // Empty result is expected due to delayed tree service sync. @@ -463,7 +495,7 @@ func (p *Pool) GetSubTree(ctx context.Context, prm GetSubTreeParams) (*SubTreeRe } var cli *rpcapi.GetSubTreeResponseReader - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { cli, inErr = rpcapi.GetSubTree(client, request, rpcclient.WithContext(ctx)) return handleError("failed to get sub tree client", inErr) }) @@ -497,7 +529,7 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) { } var resp *tree.AddResponse - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.Add(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node", inErr) }) @@ -532,7 +564,7 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint } var resp *tree.AddByPathResponse - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) (inErr error) { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) (inErr error) { resp, inErr = rpcapi.AddByPath(client, request, rpcclient.WithContext(ctx)) return handleError("failed to add node by path", inErr) }) @@ -574,7 +606,7 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error { return err } - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error { if _, err := rpcapi.Move(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to move node", err) } @@ -605,7 +637,7 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error { return err } - err := p.requestWithRetry(ctx, func(client *rpcclient.Client) error { + err := p.requestWithRetry(ctx, prm.CID, func(client *rpcclient.Client) error { if _, err := rpcapi.Remove(client, request, rpcclient.WithContext(ctx)); err != nil { return handleError("failed to remove node", err) } @@ -631,6 +663,24 @@ func (p *Pool) Close() error { } } + if closeErr := p.closeClientMapConnections(); closeErr != nil { + err = closeErr + } + + return err +} + +func (p *Pool) closeClientMapConnections() (err error) { + p.mutex.Lock() + defer p.mutex.Unlock() + + for _, cl := range p.clientMap { + if closeErr := cl.close(); closeErr != nil { + p.log(zapcore.ErrorLevel, "close client connection", zap.Error(closeErr)) + err = closeErr + } + } + return err } @@ -801,20 +851,24 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, buffer []bool) } func (p *Pool) getStartIndices() (int, int) { - p.startIndicesMtx.RLock() - defer p.startIndicesMtx.RUnlock() + p.mutex.RLock() + defer p.mutex.RUnlock() return p.startIndices[0], p.startIndices[1] } func (p *Pool) setStartIndices(i, j int) { - p.startIndicesMtx.Lock() + p.mutex.Lock() p.startIndices[0] = i p.startIndices[1] = j - p.startIndicesMtx.Unlock() + p.mutex.Unlock() } -func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error { +func (p *Pool) requestWithRetry(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error { + if p.netMapInfoSource != nil { + return p.requestWithRetryContainerNodes(ctx, cnrID, fn) + } + var ( err, finErr error cl *rpcclient.Client @@ -866,10 +920,141 @@ LOOP: return finErr } +func (p *Pool) requestWithRetryContainerNodes(ctx context.Context, cnrID cid.ID, fn func(client *rpcclient.Client) error) error { + var ( + err, finErr error + cl *rpcclient.Client + ) + + reqID := GetRequestID(ctx) + + netMap, err := p.netMapInfoSource.NetMapSnapshot(ctx) + if err != nil { + return fmt.Errorf("get net map: %w", err) + } + + policy, err := p.netMapInfoSource.PlacementPolicy(ctx, cnrID) + if err != nil { + return fmt.Errorf("get container placement policy: %w", err) + } + + cnrNodes, err := netMap.ContainerNodes(policy, cnrID[:]) + if err != nil { + return fmt.Errorf("get container nodes: %w", err) + } + + cnrNodes, err = netMap.PlacementVectors(cnrNodes, cnrID[:]) + if err != nil { + return fmt.Errorf("get placement vectors: %w", err) + } + + attempts := p.maxRequestAttempts + +LOOP: + for _, cnrNodeGroup := range cnrNodes { + for _, cnrNode := range cnrNodeGroup { + if attempts == 0 { + break LOOP + } + + treeCl, ok := p.getClientFromMap(cnrNode.Hash()) + if !ok { + treeCl, err = p.getNewTreeClient(ctx, cnrNode) + if err != nil { + finErr = finalError(finErr, err) + p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts)) + continue + } + + p.addClientToMap(cnrNode.Hash(), treeCl) + } + attempts-- + + if cl, err = treeCl.serviceClient(); err == nil { + err = fn(cl) + } + if shouldRedial(ctx, err) { + p.deleteClientFromMap(cnrNode.Hash()) + } + if !shouldTryAgain(err) { + if err != nil { + err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err) + } + + return err + } + + finErr = finalError(finErr, err) + p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), + zap.String("address", treeCl.endpoint()), zap.Error(err)) + } + } + + return finErr +} + +func (p *Pool) getClientFromMap(hash uint64) (client, bool) { + p.mutex.RLock() + defer p.mutex.RUnlock() + + cl, ok := p.clientMap[hash] + return cl, ok +} + +func (p *Pool) addClientToMap(hash uint64, cl client) { + p.mutex.Lock() + p.clientMap[hash] = cl + p.mutex.Unlock() +} + +func (p *Pool) deleteClientFromMap(hash uint64) { + p.mutex.Lock() + _ = p.clientMap[hash].close() + delete(p.clientMap, hash) + p.mutex.Unlock() +} + +func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) { + var ( + treeCl *treeClient + err error + ) + + node.IterateNetworkEndpoints(func(endpoint string) bool { + var addr network.Address + if err = addr.FromString(endpoint); err != nil { + p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err)) + return false + } + + newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout) + if err = newTreeCl.dial(ctx); err != nil { + p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err)) + return false + } + + treeCl = newTreeCl + return true + }) + + if treeCl == nil { + return nil, fmt.Errorf("tree client wasn't initialized") + } + + return treeCl, nil +} + func shouldTryAgain(err error) bool { return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) } +func shouldRedial(ctx context.Context, err error) bool { + if err == nil || errors.Is(err, ErrNodeAccessDenied) || errors.Is(err, ErrNodeNotFound) || errors.Is(err, errNodeEmptyResult) || errors.Is(ctx.Err(), context.Canceled) { + return false + } + return true +} + func prioErr(err error) int { switch { case err == nil: diff --git a/pool/tree/pool_test.go b/pool/tree/pool_test.go index f9f4142..607e037 100644 --- a/pool/tree/pool_test.go +++ b/pool/tree/pool_test.go @@ -7,7 +7,12 @@ import ( rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + "git.frostfs.info/TrueCloudLab/hrw" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) @@ -15,12 +20,14 @@ import ( type treeClientMock struct { address string err bool + used bool } func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) { if t.err { return nil, errors.New("serviceClient() mock error") } + t.used = true return nil, nil } @@ -51,6 +58,23 @@ func (t *treeClientMock) close() error { return nil } +type netMapInfoMock struct { + netMap netmap.NetMap + policy netmap.PlacementPolicy + err error +} + +func (n *netMapInfoMock) NetMapSnapshot(context.Context) (netmap.NetMap, error) { + if n.err != nil { + return netmap.NetMap{}, n.err + } + return n.netMap, nil +} + +func (n *netMapInfoMock) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) { + return n.policy, nil +} + func TestHandleError(t *testing.T) { defaultError := errors.New("default error") for _, tc := range []struct { @@ -104,14 +128,14 @@ func TestRetry(t *testing.T) { } t.Run("first ok", func(t *testing.T) { - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("first failed", func(t *testing.T) { setErrors(p, "node00") - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 1) }) @@ -119,7 +143,7 @@ func TestRetry(t *testing.T) { t.Run("all failed", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.Error(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -127,13 +151,13 @@ func TestRetry(t *testing.T) { t.Run("round", func(t *testing.T) { setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndices(t, p, 0, 2) resetClientsErrors(p) setErrors(p, nodes[0][2], nodes[0][3]) - err = p.requestWithRetry(ctx, makeFn) + err = p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -141,14 +165,14 @@ func TestRetry(t *testing.T) { t.Run("group switch", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1][0]) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 1) }) t.Run("group round", func(t *testing.T) { setErrors(p, nodes[0][1:]...) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) @@ -156,7 +180,7 @@ func TestRetry(t *testing.T) { t.Run("group round switch", func(t *testing.T) { setErrors(p, nodes[0]...) p.setStartIndices(0, 1) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 0) }) @@ -164,14 +188,14 @@ func TestRetry(t *testing.T) { t.Run("no panic group switch", func(t *testing.T) { setErrors(p, nodes[1]...) p.setStartIndices(1, 0) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("error empty result", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { if index < errNodes { index++ return errNodeEmptyResult @@ -184,7 +208,7 @@ func TestRetry(t *testing.T) { t.Run("error not found", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { if index < errNodes { index++ return ErrNodeNotFound @@ -197,7 +221,7 @@ func TestRetry(t *testing.T) { t.Run("error access denied", func(t *testing.T) { var index int - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { index++ return ErrNodeAccessDenied }) @@ -211,7 +235,7 @@ func TestRetry(t *testing.T) { p.maxRequestAttempts = 2 setErrors(p, nodes[0]...) setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) + err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.Error(t, err) checkIndicesAndReset(t, p, 0, 2) p.maxRequestAttempts = oldVal @@ -273,6 +297,125 @@ func TestRebalance(t *testing.T) { }) } +func TestRetryContainerNodes(t *testing.T) { + ctx := context.Background() + nodesCount := 3 + policy := getPlacementPolicy(uint32(nodesCount)) + p := &Pool{ + logger: zaptest.NewLogger(t), + maxRequestAttempts: nodesCount, + } + + var nm netmap.NetMap + for i := 0; i < nodesCount; i++ { + key, err := keys.NewPrivateKey() + require.NoError(t, err) + nm.SetNodes(append(nm.Nodes(), getNodeInfo(key.Bytes()))) + } + p.netMapInfoSource = &netMapInfoMock{netMap: nm, policy: policy} + + cnrID := cidtest.ID() + cnrNodes, err := nm.ContainerNodes(policy, cnrID[:]) + require.NoError(t, err) + cnrNodes, err = nm.PlacementVectors(cnrNodes, cnrID[:]) + require.NoError(t, err) + require.Len(t, cnrNodes, 1) + require.Len(t, cnrNodes[0], nodesCount) + + makeFn := func(client *rpcClient.Client) error { + return nil + } + + t.Run("first ok", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.NoError(t, err) + checkClientsUsage(t, p, cnrNodes[0][0]) + checkClientsPresence(t, p, cnrNodes[0]...) + }) + + t.Run("first failed", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + setClientMapErrors(p, cnrNodes[0][0]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.NoError(t, err) + checkClientsUsage(t, p, cnrNodes[0][1]) + checkClientsPresence(t, p, cnrNodes[0][1:]...) + }) + + t.Run("first two failed", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.NoError(t, err) + checkClientsUsage(t, p, cnrNodes[0][2]) + checkClientsPresence(t, p, cnrNodes[0][2]) + }) + + t.Run("all failed", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1], cnrNodes[0][2]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.Error(t, err) + checkClientsUsage(t, p) + checkClientsPresence(t, p) + }) + + t.Run("error empty result", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + errNodes, index := 2, 0 + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { + if index < errNodes { + index++ + return errNodeEmptyResult + } + return nil + }) + require.NoError(t, err) + checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...) + checkClientsPresence(t, p, cnrNodes[0]...) + }) + + t.Run("error not found", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + errNodes, index := 2, 0 + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { + if index < errNodes { + index++ + return ErrNodeNotFound + } + return nil + }) + require.NoError(t, err) + checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...) + checkClientsPresence(t, p, cnrNodes[0]...) + }) + + t.Run("error access denied", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + var index int + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { + index++ + return ErrNodeAccessDenied + }) + require.ErrorIs(t, err, ErrNodeAccessDenied) + require.Equal(t, 1, index) + checkClientsUsage(t, p, cnrNodes[0][0]) + checkClientsPresence(t, p, cnrNodes[0]...) + }) + + t.Run("limit attempts", func(t *testing.T) { + p.clientMap = makeClientMap(cnrNodes[0]) + p.maxRequestAttempts = 2 + setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.Error(t, err) + checkClientsUsage(t, p) + checkClientsPresence(t, p, cnrNodes[0][2]) + p.maxRequestAttempts = nodesCount + }) +} + func makeInnerPool(nodes [][]string) []*innerPool { res := make([]*innerPool, len(nodes)) @@ -359,3 +502,59 @@ func containsStr(list []string, item string) bool { return false } + +func makeClientMap(nodes []netmap.NodeInfo) map[uint64]client { + res := make(map[uint64]client, len(nodes)) + for _, node := range nodes { + res[hrw.Hash(node.PublicKey())] = &treeClientMock{} + } + return res +} + +func checkClientsPresence(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) { + require.Len(t, p.clientMap, len(nodes)) + for _, node := range nodes { + require.NotNil(t, p.clientMap[hrw.Hash(node.PublicKey())]) + } +} + +func checkClientsUsage(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) { + for hash, cl := range p.clientMap { + if containsHash(nodes, hash) { + require.True(t, cl.(*treeClientMock).used) + } else { + require.False(t, cl.(*treeClientMock).used) + } + } +} + +func setClientMapErrors(p *Pool, nodes ...netmap.NodeInfo) { + for hash, cl := range p.clientMap { + if containsHash(nodes, hash) { + cl.(*treeClientMock).err = true + } + } +} + +func containsHash(list []netmap.NodeInfo, hash uint64) bool { + for i := range list { + if hrw.Hash(list[i].PublicKey()) == hash { + return true + } + } + + return false +} + +func getPlacementPolicy(replicas uint32) (p netmap.PlacementPolicy) { + var r netmap.ReplicaDescriptor + r.SetNumberOfObjects(replicas) + p.AddReplicas([]netmap.ReplicaDescriptor{r}...) + return p +} + +func getNodeInfo(key []byte) netmap.NodeInfo { + var node netmap.NodeInfo + node.SetPublicKey(key) + return node +} -- 2.45.2