From e25995e12aa1a5ed55fd2d3ed96329428c8dbb95 Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Thu, 5 Dec 2024 13:24:11 +0300 Subject: [PATCH] [#305] tree/pool: Use net map to prioritize tree services New version of tree/pool selects tree service connection to make request based on the current net map and container placement policy Signed-off-by: Marina Biryukova --- go.mod | 13 +- go.sum | Bin 17793 -> 20328 bytes pool/tree/network/address.go | 110 +++++++++ pool/tree/network/tls.go | 16 ++ pool/tree/pool.go | 449 ++++++++++++++++++----------------- pool/tree/pool_test.go | 344 +++++++++++---------------- 6 files changed, 499 insertions(+), 433 deletions(-) create mode 100644 pool/tree/network/address.go create mode 100644 pool/tree/network/tls.go diff --git a/go.mod b/go.mod index ed14604..2928e8e 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-sdk-go go 1.22 require ( + git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.0 git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/hrw v1.2.1 @@ -14,6 +15,7 @@ require ( github.com/klauspost/reedsolomon v1.12.1 github.com/mailru/easyjson v0.7.7 github.com/mr-tron/base58 v1.2.0 + github.com/multiformats/go-multiaddr v0.12.1 github.com/nspcc-dev/neo-go v0.106.2 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 @@ -29,13 +31,21 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/golang/snappy v0.0.1 // indirect github.com/gorilla/websocket v1.5.1 // indirect + github.com/ipfs/go-cid v0.0.7 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect - github.com/kr/pretty v0.1.0 // indirect + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/multiformats/go-base32 v0.0.3 // indirect + github.com/multiformats/go-base36 v0.1.0 // indirect + github.com/multiformats/go-multibase v0.0.3 // indirect + github.com/multiformats/go-multihash v0.0.14 // indirect + github.com/multiformats/go-varint v0.0.6 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect github.com/nspcc-dev/rfc6979 v0.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 // indirect github.com/twmb/murmur3 v1.1.8 // indirect go.etcd.io/bbolt v1.3.9 // indirect @@ -46,5 +56,4 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/go.sum b/go.sum index 56930c779a92b45099c8646bf7fe31a3ba0ab9b9..90b7a71b4d2ff6d011da86b6df30bf7e74234245 100644 GIT binary patch delta 2535 zcmb7_yN~1M6~(a*-~=x0CL4PP20Snv!vQ0aAKzTqK=kHIq)3UTbDj>j7BdUp)4{+)9!&%NK5zkBlUzdiZm$GGCjWY2m5%iEWYBKHwe9)CO)sJr?3i=FGWG}zaE)eF1+{iHo;$IA^{1~o|6Kgz z=Z7G3ixZ_|1@`q0=^|a~4BHn9zvO$ED-FkB8+g1~%oW)V{iRsQqjYBb2&6JpgoXNN zZ+SuKS7z5r6Zl2ig+o$nn3ixLHj5P7iuCjb%-)^AWV02Dtc^#UO3a6exAg`vcl_+> zN4IZ3{rek#e(z_u`g@-~IRu*^h0WNA9MtR-LGpjQNC!m#iBKqpkN^Qp2XhIRLUGF^ zOmL18>sgd5EMbH$BHzfCW=_K0x@m&2>er^3#PdcBs?;?g??II0q2S?hiWhOSA+7GX zeXL*vy^!#_-WnxO=aR9DMx2zk%MCLuCmd$G%1eX`-#`iSJ}5YJK?mucvN8w*FhBql zp*y7Q;5bEK2t|Pl4aCe@ zh_K4*V9MlLHJj>HxFj0>r(tQTbBR9w>)9vAub+LzoSa=C8f%CyC)?Q5!@h0x#Tr^v zx|q|3JAv4c_E2ZB+&c4iuF=QeUw!Zs(8Z2lUk#3by?T25{OT`9VHP4mPk@7BRI z3I{vw>UkrubB)THfF!@F4LRfo17eoZs8ZdSc_OwekE)t@I?iZm3r7+6aHg*U-v>B= z=h>-+lL5U)6cy7Pd|i2j)(VUmdS)cj(p(9VRw1Mo?)6?TK%SkSCUkbrtK?!orucAT zV+lDnMGdv&#vdSJK)19lN)v@XFs0_td+XqNG@guT%pwaKnMHBI%1ohdP-&35 zZmQavCXb~33SREW;(N{bZZwCxq2K)k!?yD~_op*?w!cfDeh*%)d$h71W*{4b@yZW~ z!2yErwr@fk!DM~ft!zzniZu^J?7x0S! z1O!yvQSt>gi}W>$53m)gIcPbnIa^nT1j4Mtxk>C9*?Ot}!?(S^o04x7{x<;tb|JE~ zdn-{`)VYx`sn9b@%@Uiwq=j~AE$c=$M@qj`-D)4cT;zu|@N56nemB&^>`)f)Xnc#7%Bzq$~3%LKYgcM;UB5WQ}x>nBv3#S^NO=-s|b2 z)tphttG)Ii=|a%j4@Q+sQ;k%#$-IXq`{5K7+KORoMe7~2kAHFd+h1mH%BPo;1OU)D zjw2ny!V!X`3>0 len(p.innerPools)-1 { - return - } - nodesByPriority := p.innerPools[i] - options := p.rebalanceParams - - var wg sync.WaitGroup - for j, cli := range nodesByPriority.clients { - wg.Add(1) - go func(j int, cli client) { - defer wg.Done() - - tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) - defer c() - - changed, err := cli.redialIfNecessary(tctx) - healthy := err == nil - if changed { - fields := []zap.Field{zap.String("address", cli.endpoint()), zap.Bool("healthy", healthy)} - if err != nil { - fields = append(fields, zap.Error(err)) - } - p.log(zap.DebugLevel, "tree health has changed", fields...) - } else if err != nil { - p.log(zap.DebugLevel, "tree redial error", zap.String("address", cli.endpoint()), zap.Error(err)) - } - buffer[j] = healthy - }(j, cli) - } - wg.Wait() -} - -func (p *Pool) getStartIndices() (int, int) { - p.startIndicesMtx.RLock() - defer p.startIndicesMtx.RUnlock() - - return p.startIndices[0], p.startIndices[1] -} - -func (p *Pool) setStartIndices(i, j int) { - p.startIndicesMtx.Lock() - p.startIndices[0] = i - p.startIndices[1] = j - p.startIndicesMtx.Unlock() -} - -func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.Client) error) error { +func (p *Pool) requestWithRetry(ctx context.Context, cid cid.ID, fn func(client *rpcclient.Client) error) error { var ( err, finErr error cl *rpcclient.Client @@ -822,35 +738,57 @@ func (p *Pool) requestWithRetry(ctx context.Context, fn func(client *rpcclient.C reqID := GetRequestID(ctx) - startI, startJ := p.getStartIndices() - groupsLen := len(p.innerPools) + netMap, err := p.netMapSource.NetMapSnapshot(ctx) + if err != nil { + return fmt.Errorf("get net map: %w", err) + } + + policy, ok := p.policySource.PlacementPolicy(cid) + if !ok { + policy, err = p.getContainerPlacementPolicy(ctx, cid, reqID) + if err != nil { + return fmt.Errorf("get container placement policy: %w", err) + } + } + + cnrNodes, err := netMap.ContainerNodes(policy, cid[:]) + if err != nil { + return fmt.Errorf("get container nodes: %w", err) + } + + cnrNodes, err = netMap.PlacementVectors(cnrNodes, cid[:]) + if err != nil { + return fmt.Errorf("get placement vectors: %w", err) + } + attempts := p.maxRequestAttempts LOOP: - for i := startI; i < startI+groupsLen; i++ { - indexI := i % groupsLen - clientsLen := len(p.innerPools[indexI].clients) - for j := startJ; j < startJ+clientsLen; j++ { - indexJ := j % clientsLen - + for _, cnrNodeGroup := range cnrNodes { + for _, cnrNode := range cnrNodeGroup { if attempts == 0 { - if startI != indexI || startJ != indexJ { - p.setStartIndices(indexI, indexJ) - } break LOOP } attempts-- - if cl, err = p.innerPools[indexI].clients[indexJ].serviceClient(); err == nil { + treeCl, ok := p.clientMap[cnrNode.Hash()] + if !ok { + treeCl, err = p.getNewTreeClient(ctx, cnrNode) + if err != nil { + finErr = finalError(finErr, err) + p.log(zap.DebugLevel, "failed to create tree client", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts)) + continue + } + + p.clientMap[cnrNode.Hash()] = treeCl + } + + if cl, err = treeCl.serviceClient(); err == nil { err = fn(cl) } if !shouldTryAgain(err) { - if startI != indexI || startJ != indexJ { - p.setStartIndices(indexI, indexJ) - } - if err != nil { - err = fmt.Errorf("address %s: %w", p.innerPools[indexI].clients[indexJ].endpoint(), err) + err = fmt.Errorf("address %s: %w", treeCl.endpoint(), err) } return err @@ -858,14 +796,83 @@ LOOP: finErr = finalError(finErr, err) p.log(zap.DebugLevel, "tree request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), - zap.String("address", p.innerPools[indexI].clients[indexJ].endpoint()), zap.Error(err)) + zap.String("address", treeCl.endpoint()), zap.Error(err)) } - startJ = 0 } return finErr } +func (p *Pool) getContainerPlacementPolicy(ctx context.Context, cid cid.ID, reqID string) (netmap.PlacementPolicy, error) { + var ( + err error + res *sdkClient.ResContainerGet + ) + attempts := p.maxRequestAttempts + +LOOP: + for _, group := range p.innerPools { + for _, cl := range group.clients { + if attempts == 0 { + break LOOP + } + attempts-- + + prm := sdkClient.PrmContainerGet{ + ContainerID: &cid, + } + res, err = cl.ContainerGet(ctx, prm) + var st apistatus.Status + if res != nil { + st = res.Status() + } + stErr := apistatus.ErrFromStatus(st) + if stErr != nil && err == nil { + err = stErr + } + if err != nil { + p.log(zap.DebugLevel, "placement policy request error", zap.String("request_id", reqID), zap.Int("remaining attempts", attempts), + zap.Error(err)) + continue + } + + return res.Container().PlacementPolicy(), nil + } + } + + return netmap.PlacementPolicy{}, err +} + +func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*treeClient, error) { + var ( + treeCl *treeClient + err error + ) + + node.IterateNetworkEndpoints(func(endpoint string) bool { + var addr network.Address + if err = addr.FromString(endpoint); err != nil { + p.log(zap.WarnLevel, "can't parse endpoint", zap.String("endpoint", endpoint), zap.Error(err)) + return false + } + + newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout) + if err = newTreeCl.dial(ctx); err != nil { + p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err)) + return false + } + + treeCl = newTreeCl + return true + }) + + if treeCl == nil { + return nil, err + } + + return treeCl, nil +} + func shouldTryAgain(err error) bool { return !(err == nil || errors.Is(err, ErrNodeAccessDenied)) } diff --git a/pool/tree/pool_test.go b/pool/tree/pool_test.go index f9f4142..ebd53de 100644 --- a/pool/tree/pool_test.go +++ b/pool/tree/pool_test.go @@ -7,25 +7,30 @@ import ( rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/hrw" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) type treeClientMock struct { - address string - err bool + err bool + used bool } func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) { if t.err { return nil, errors.New("serviceClient() mock error") } + t.used = true return nil, nil } func (t *treeClientMock) endpoint() string { - return t.address + return "" } func (t *treeClientMock) isHealthy() bool { @@ -51,6 +56,26 @@ func (t *treeClientMock) close() error { return nil } +type placementPolicyMock struct { + policy netmap.PlacementPolicy +} + +func (p *placementPolicyMock) PlacementPolicy(cid.ID) (netmap.PlacementPolicy, bool) { + return p.policy, true +} + +type netMapMock struct { + netMap netmap.NetMap + err error +} + +func (n *netMapMock) NetMapSnapshot(context.Context) (netmap.NetMap, error) { + if n.err != nil { + return netmap.NetMap{}, n.err + } + return n.netMap, nil +} + func TestHandleError(t *testing.T) { defaultError := errors.New("default error") for _, tc := range []struct { @@ -83,95 +108,74 @@ func TestHandleError(t *testing.T) { func TestRetry(t *testing.T) { ctx := context.Background() - nodes := [][]string{ - {"node00", "node01", "node02", "node03"}, - {"node10", "node11", "node12", "node13"}, - } - - var lenNodes int - for i := range nodes { - lenNodes += len(nodes[i]) - } - + nodesCount := 3 + policy := getPlacementPolicy(uint32(nodesCount)) p := &Pool{ logger: zaptest.NewLogger(t), - innerPools: makeInnerPool(nodes), - maxRequestAttempts: lenNodes, + maxRequestAttempts: nodesCount, + policySource: &placementPolicyMock{policy: policy}, + clientMap: make(map[uint64]client), } + var nm netmap.NetMap + nodeKeys := make([]*keys.PrivateKey, nodesCount) + for i := 0; i < nodesCount; i++ { + key, err := keys.NewPrivateKey() + require.NoError(t, err) + nodeKeys[i] = key + + p.clientMap[hrw.Hash(key.Bytes())] = &treeClientMock{} + nm.SetNodes(append(nm.Nodes(), getNodeInfo(key.Bytes()))) + } + p.netMapSource = &netMapMock{netMap: nm} + + cnrID := cidtest.ID() + cnrNodes, err := nm.ContainerNodes(policy, cnrID[:]) + require.NoError(t, err) + cnrNodes, err = nm.PlacementVectors(cnrNodes, cnrID[:]) + require.NoError(t, err) + require.Len(t, cnrNodes, 1) + require.Len(t, cnrNodes[0], nodesCount) + nodeKeys = reorderKeys(nodeKeys, cnrNodes) + makeFn := func(client *rpcClient.Client) error { return nil } t.Run("first ok", func(t *testing.T) { - err := p.requestWithRetry(ctx, makeFn) + err = p.requestWithRetry(ctx, cnrID, makeFn) require.NoError(t, err) - checkIndicesAndReset(t, p, 0, 0) + checkClientUsage(t, p, nodeKeys[0]) + resetClients(p) }) t.Run("first failed", func(t *testing.T) { - setErrors(p, "node00") - err := p.requestWithRetry(ctx, makeFn) + setErrors(p, nodeKeys[0]) + err = p.requestWithRetry(ctx, cnrID, makeFn) require.NoError(t, err) - checkIndicesAndReset(t, p, 0, 1) + checkClientUsage(t, p, nodeKeys[1]) + resetClients(p) + }) + + t.Run("first two failed", func(t *testing.T) { + setErrors(p, nodeKeys[0], nodeKeys[1]) + err = p.requestWithRetry(ctx, cnrID, makeFn) + require.NoError(t, err) + checkClientUsage(t, p, nodeKeys[2]) + resetClients(p) }) t.Run("all failed", func(t *testing.T) { - setErrors(p, nodes[0]...) - setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) + setErrors(p, nodeKeys[0], nodeKeys[1], nodeKeys[2]) + err = p.requestWithRetry(ctx, cnrID, makeFn) require.Error(t, err) - checkIndicesAndReset(t, p, 0, 0) - }) - - t.Run("round", func(t *testing.T) { - setErrors(p, nodes[0][0], nodes[0][1]) - setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndices(t, p, 0, 2) - resetClientsErrors(p) - - setErrors(p, nodes[0][2], nodes[0][3]) - err = p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndicesAndReset(t, p, 0, 0) - }) - - t.Run("group switch", func(t *testing.T) { - setErrors(p, nodes[0]...) - setErrors(p, nodes[1][0]) - err := p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndicesAndReset(t, p, 1, 1) - }) - - t.Run("group round", func(t *testing.T) { - setErrors(p, nodes[0][1:]...) - err := p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndicesAndReset(t, p, 0, 0) - }) - - t.Run("group round switch", func(t *testing.T) { - setErrors(p, nodes[0]...) - p.setStartIndices(0, 1) - err := p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndicesAndReset(t, p, 1, 0) - }) - - t.Run("no panic group switch", func(t *testing.T) { - setErrors(p, nodes[1]...) - p.setStartIndices(1, 0) - err := p.requestWithRetry(ctx, makeFn) - require.NoError(t, err) - checkIndicesAndReset(t, p, 0, 0) + checkClientUsage(t, p) + resetClients(p) }) t.Run("error empty result", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { if index < errNodes { index++ return errNodeEmptyResult @@ -179,12 +183,13 @@ func TestRetry(t *testing.T) { return nil }) require.NoError(t, err) - checkIndicesAndReset(t, p, 0, errNodes) + checkClientUsage(t, p, nodeKeys[:errNodes+1]...) + resetClients(p) }) t.Run("error not found", func(t *testing.T) { errNodes, index := 2, 0 - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { if index < errNodes { index++ return ErrNodeNotFound @@ -192,170 +197,89 @@ func TestRetry(t *testing.T) { return nil }) require.NoError(t, err) - checkIndicesAndReset(t, p, 0, errNodes) + checkClientUsage(t, p, nodeKeys[:errNodes+1]...) + resetClients(p) }) t.Run("error access denied", func(t *testing.T) { var index int - err := p.requestWithRetry(ctx, func(client *rpcClient.Client) error { + err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { index++ return ErrNodeAccessDenied }) require.ErrorIs(t, err, ErrNodeAccessDenied) require.Equal(t, 1, index) - checkIndicesAndReset(t, p, 0, 0) + checkClientUsage(t, p, nodeKeys[0]) + resetClients(p) }) t.Run("limit attempts", func(t *testing.T) { - oldVal := p.maxRequestAttempts p.maxRequestAttempts = 2 - setErrors(p, nodes[0]...) - setErrors(p, nodes[1]...) - err := p.requestWithRetry(ctx, makeFn) + setErrors(p, nodeKeys[0], nodeKeys[1]) + err = p.requestWithRetry(ctx, cnrID, makeFn) require.Error(t, err) - checkIndicesAndReset(t, p, 0, 2) - p.maxRequestAttempts = oldVal - }) -} - -func TestRebalance(t *testing.T) { - nodes := [][]string{ - {"node00", "node01"}, - {"node10", "node11"}, - } - - p := &Pool{ - logger: zaptest.NewLogger(t), - innerPools: makeInnerPool(nodes), - rebalanceParams: rebalanceParameters{ - nodesGroup: makeNodesGroup(nodes), - }, - } - - ctx := context.Background() - buffers := makeBuffer(p) - - t.Run("check dirty buffers", func(t *testing.T) { - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 0, 0) - setErrors(p, nodes[0][0]) - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 0, 1) - resetClients(p) - }) - - t.Run("don't change healthy status", func(t *testing.T) { - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 0, 0) - resetClients(p) - }) - - t.Run("switch to second group", func(t *testing.T) { - setErrors(p, nodes[0][0], nodes[0][1]) - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 1, 0) - resetClients(p) - }) - - t.Run("switch back and forth", func(t *testing.T) { - setErrors(p, nodes[0][0], nodes[0][1]) - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 1, 0) - - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 1, 0) - - setNoErrors(p, nodes[0][0]) - p.updateNodesHealth(ctx, buffers) - checkIndices(t, p, 0, 0) - + checkClientUsage(t, p) resetClients(p) + p.maxRequestAttempts = nodesCount }) } -func makeInnerPool(nodes [][]string) []*innerPool { - res := make([]*innerPool, len(nodes)) - - for i, group := range nodes { - res[i] = &innerPool{clients: make([]client, len(group))} - for j, node := range group { - res[i].clients[j] = &treeClientMock{address: node} - } - } - - return res -} - -func makeNodesGroup(nodes [][]string) [][]pool.NodeParam { - res := make([][]pool.NodeParam, len(nodes)) - - for i, group := range nodes { - res[i] = make([]pool.NodeParam, len(group)) - for j, node := range group { - res[i][j] = pool.NewNodeParam(1, node, 1) - } - } - - return res -} - -func makeBuffer(p *Pool) [][]bool { - buffers := make([][]bool, len(p.rebalanceParams.nodesGroup)) - for i, nodes := range p.rebalanceParams.nodesGroup { - buffers[i] = make([]bool, len(nodes)) - } - return buffers -} - -func checkIndicesAndReset(t *testing.T, p *Pool, iExp, jExp int) { - checkIndices(t, p, iExp, jExp) - resetClients(p) -} - -func checkIndices(t *testing.T, p *Pool, iExp, jExp int) { - i, j := p.getStartIndices() - require.Equal(t, [2]int{iExp, jExp}, [2]int{i, j}) -} - -func resetClients(p *Pool) { - resetClientsErrors(p) - p.setStartIndices(0, 0) -} - -func resetClientsErrors(p *Pool) { - for _, group := range p.innerPools { - for _, cl := range group.clients { - node := cl.(*treeClientMock) - node.err = false - } - } -} - -func setErrors(p *Pool, nodes ...string) { - setErrorsBase(p, true, nodes...) -} - -func setNoErrors(p *Pool, nodes ...string) { - setErrorsBase(p, false, nodes...) -} - -func setErrorsBase(p *Pool, err bool, nodes ...string) { - for _, group := range p.innerPools { - for _, cl := range group.clients { - node := cl.(*treeClientMock) - if containsStr(nodes, node.address) { - node.err = err +func reorderKeys(nodeKeys []*keys.PrivateKey, cnrNodes [][]netmap.NodeInfo) []*keys.PrivateKey { + res := make([]*keys.PrivateKey, len(nodeKeys)) + for i := 0; i < len(cnrNodes[0]); i++ { + for j := 0; j < len(nodeKeys); j++ { + if hrw.Hash(nodeKeys[j].Bytes()) == cnrNodes[0][i].Hash() { + res[i] = nodeKeys[j] } } } + return res } -func containsStr(list []string, item string) bool { +func checkClientUsage(t *testing.T, p *Pool, nodeKeys ...*keys.PrivateKey) { + for hash, cl := range p.clientMap { + if containsHash(nodeKeys, hash) { + require.True(t, cl.(*treeClientMock).used) + } else { + require.False(t, cl.(*treeClientMock).used) + } + } +} + +func resetClients(p *Pool) { + for _, cl := range p.clientMap { + cl.(*treeClientMock).used = false + cl.(*treeClientMock).err = false + } +} + +func setErrors(p *Pool, nodeKeys ...*keys.PrivateKey) { + for hash, cl := range p.clientMap { + if containsHash(nodeKeys, hash) { + cl.(*treeClientMock).err = true + } + } +} + +func containsHash(list []*keys.PrivateKey, hash uint64) bool { for i := range list { - if list[i] == item { + if hrw.Hash(list[i].Bytes()) == hash { return true } } return false } + +func getPlacementPolicy(replicas uint32) (p netmap.PlacementPolicy) { + var r netmap.ReplicaDescriptor + r.SetNumberOfObjects(replicas) + p.AddReplicas([]netmap.ReplicaDescriptor{r}...) + return p +} + +func getNodeInfo(key []byte) netmap.NodeInfo { + var node netmap.NodeInfo + node.SetPublicKey(key) + return node +}