package tree import ( "context" "errors" "testing" rpcClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/rpc/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/hrw" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) type treeClientMock struct { address string err bool used bool } func (t *treeClientMock) serviceClient() (*rpcClient.Client, error) { if t.err { return nil, errors.New("serviceClient() mock error") } t.used = true return nil, nil } func (t *treeClientMock) endpoint() string { return t.address } func (t *treeClientMock) isHealthy() bool { return true } func (t *treeClientMock) setHealthy(bool) { return } func (t *treeClientMock) dial(context.Context) error { return nil } func (t *treeClientMock) redialIfNecessary(context.Context) (bool, error) { if t.err { return false, errors.New("redialIfNecessary() mock error") } return false, nil } func (t *treeClientMock) close() error { return nil } type netMapInfoMock struct { netMap netmap.NetMap policy netmap.PlacementPolicy err error } func (n *netMapInfoMock) NetMapSnapshot(context.Context) (netmap.NetMap, error) { if n.err != nil { return netmap.NetMap{}, n.err } return n.netMap, nil } func (n *netMapInfoMock) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) { return n.policy, nil } func TestHandleError(t *testing.T) { defaultError := errors.New("default error") for _, tc := range []struct { err error expectedError error }{ { err: defaultError, expectedError: defaultError, }, { err: errors.New("something not found"), expectedError: ErrNodeNotFound, }, { err: errors.New("something is denied by some acl rule"), expectedError: ErrNodeAccessDenied, }, { err: &apistatus.APEManagerAccessDenied{}, expectedError: ErrNodeAccessDenied, }, } { t.Run("", func(t *testing.T) { err := handleError("err message", tc.err) require.True(t, errors.Is(err, tc.expectedError)) }) } } func TestRetry(t *testing.T) { ctx := context.Background() nodes := [][]string{ {"node00", "node01", "node02", "node03"}, {"node10", "node11", "node12", "node13"}, } var lenNodes int for i := range nodes { lenNodes += len(nodes[i]) } p := &Pool{ logger: zaptest.NewLogger(t), innerPools: makeInnerPool(nodes), maxRequestAttempts: lenNodes, } makeFn := func(client *rpcClient.Client) error { return nil } t.Run("first ok", func(t *testing.T) { err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("first failed", func(t *testing.T) { setErrors(p, "node00") err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 1) }) t.Run("all failed", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1]...) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.Error(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("round", func(t *testing.T) { setErrors(p, nodes[0][0], nodes[0][1]) setErrors(p, nodes[1]...) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndices(t, p, 0, 2) resetClientsErrors(p) setErrors(p, nodes[0][2], nodes[0][3]) err = p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("group switch", func(t *testing.T) { setErrors(p, nodes[0]...) setErrors(p, nodes[1][0]) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 1) }) t.Run("group round", func(t *testing.T) { setErrors(p, nodes[0][1:]...) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("group round switch", func(t *testing.T) { setErrors(p, nodes[0]...) p.setStartIndices(0, 1) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 1, 0) }) t.Run("no panic group switch", func(t *testing.T) { setErrors(p, nodes[1]...) p.setStartIndices(1, 0) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.NoError(t, err) checkIndicesAndReset(t, p, 0, 0) }) t.Run("error empty result", func(t *testing.T) { errNodes, index := 2, 0 err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { if index < errNodes { index++ return errNodeEmptyResult } return nil }) require.NoError(t, err) checkIndicesAndReset(t, p, 0, errNodes) }) t.Run("error not found", func(t *testing.T) { errNodes, index := 2, 0 err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { if index < errNodes { index++ return ErrNodeNotFound } return nil }) require.NoError(t, err) checkIndicesAndReset(t, p, 0, errNodes) }) t.Run("error access denied", func(t *testing.T) { var index int err := p.requestWithRetry(ctx, cidtest.ID(), func(client *rpcClient.Client) error { index++ return ErrNodeAccessDenied }) require.ErrorIs(t, err, ErrNodeAccessDenied) require.Equal(t, 1, index) checkIndicesAndReset(t, p, 0, 0) }) t.Run("limit attempts", func(t *testing.T) { oldVal := p.maxRequestAttempts p.maxRequestAttempts = 2 setErrors(p, nodes[0]...) setErrors(p, nodes[1]...) err := p.requestWithRetry(ctx, cidtest.ID(), makeFn) require.Error(t, err) checkIndicesAndReset(t, p, 0, 2) p.maxRequestAttempts = oldVal }) } func TestRebalance(t *testing.T) { nodes := [][]string{ {"node00", "node01"}, {"node10", "node11"}, } p := &Pool{ logger: zaptest.NewLogger(t), innerPools: makeInnerPool(nodes), rebalanceParams: rebalanceParameters{ nodesGroup: makeNodesGroup(nodes), }, } ctx := context.Background() buffers := makeBuffer(p) t.Run("check dirty buffers", func(t *testing.T) { p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 0, 0) setErrors(p, nodes[0][0]) p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 0, 1) resetClients(p) }) t.Run("don't change healthy status", func(t *testing.T) { p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 0, 0) resetClients(p) }) t.Run("switch to second group", func(t *testing.T) { setErrors(p, nodes[0][0], nodes[0][1]) p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 1, 0) resetClients(p) }) t.Run("switch back and forth", func(t *testing.T) { setErrors(p, nodes[0][0], nodes[0][1]) p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 1, 0) p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 1, 0) setNoErrors(p, nodes[0][0]) p.updateNodesHealth(ctx, buffers) checkIndices(t, p, 0, 0) resetClients(p) }) } func TestRetryContainerNodes(t *testing.T) { ctx := context.Background() nodesCount := 3 policy := getPlacementPolicy(uint32(nodesCount)) p := &Pool{ logger: zaptest.NewLogger(t), maxRequestAttempts: nodesCount, } var nm netmap.NetMap for i := 0; i < nodesCount; i++ { key, err := keys.NewPrivateKey() require.NoError(t, err) nm.SetNodes(append(nm.Nodes(), getNodeInfo(key.Bytes()))) } p.netMapInfoSource = &netMapInfoMock{netMap: nm, policy: policy} cnrID := cidtest.ID() cnrNodes, err := nm.ContainerNodes(policy, cnrID[:]) require.NoError(t, err) cnrNodes, err = nm.PlacementVectors(cnrNodes, cnrID[:]) require.NoError(t, err) require.Len(t, cnrNodes, 1) require.Len(t, cnrNodes[0], nodesCount) makeFn := func(client *rpcClient.Client) error { return nil } t.Run("first ok", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) err = p.requestWithRetry(ctx, cnrID, makeFn) require.NoError(t, err) checkClientsUsage(t, p, cnrNodes[0][0]) checkClientsPresence(t, p, cnrNodes[0]...) }) t.Run("first failed", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) setClientMapErrors(p, cnrNodes[0][0]) err = p.requestWithRetry(ctx, cnrID, makeFn) require.NoError(t, err) checkClientsUsage(t, p, cnrNodes[0][1]) checkClientsPresence(t, p, cnrNodes[0][1:]...) }) t.Run("first two failed", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1]) err = p.requestWithRetry(ctx, cnrID, makeFn) require.NoError(t, err) checkClientsUsage(t, p, cnrNodes[0][2]) checkClientsPresence(t, p, cnrNodes[0][2]) }) t.Run("all failed", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1], cnrNodes[0][2]) err = p.requestWithRetry(ctx, cnrID, makeFn) require.Error(t, err) checkClientsUsage(t, p) checkClientsPresence(t, p) }) t.Run("error empty result", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) errNodes, index := 2, 0 err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { if index < errNodes { index++ return errNodeEmptyResult } return nil }) require.NoError(t, err) checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...) checkClientsPresence(t, p, cnrNodes[0]...) }) t.Run("error not found", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) errNodes, index := 2, 0 err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { if index < errNodes { index++ return ErrNodeNotFound } return nil }) require.NoError(t, err) checkClientsUsage(t, p, cnrNodes[0][:errNodes+1]...) checkClientsPresence(t, p, cnrNodes[0]...) }) t.Run("error access denied", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) var index int err = p.requestWithRetry(ctx, cnrID, func(client *rpcClient.Client) error { index++ return ErrNodeAccessDenied }) require.ErrorIs(t, err, ErrNodeAccessDenied) require.Equal(t, 1, index) checkClientsUsage(t, p, cnrNodes[0][0]) checkClientsPresence(t, p, cnrNodes[0]...) }) t.Run("limit attempts", func(t *testing.T) { p.clientMap = makeClientMap(cnrNodes[0]) p.maxRequestAttempts = 2 setClientMapErrors(p, cnrNodes[0][0], cnrNodes[0][1]) err = p.requestWithRetry(ctx, cnrID, makeFn) require.Error(t, err) checkClientsUsage(t, p) checkClientsPresence(t, p, cnrNodes[0][2]) p.maxRequestAttempts = nodesCount }) } func makeInnerPool(nodes [][]string) []*innerPool { res := make([]*innerPool, len(nodes)) for i, group := range nodes { res[i] = &innerPool{clients: make([]client, len(group))} for j, node := range group { res[i].clients[j] = &treeClientMock{address: node} } } return res } func makeNodesGroup(nodes [][]string) [][]pool.NodeParam { res := make([][]pool.NodeParam, len(nodes)) for i, group := range nodes { res[i] = make([]pool.NodeParam, len(group)) for j, node := range group { res[i][j] = pool.NewNodeParam(1, node, 1) } } return res } func makeBuffer(p *Pool) [][]bool { buffers := make([][]bool, len(p.rebalanceParams.nodesGroup)) for i, nodes := range p.rebalanceParams.nodesGroup { buffers[i] = make([]bool, len(nodes)) } return buffers } func checkIndicesAndReset(t *testing.T, p *Pool, iExp, jExp int) { checkIndices(t, p, iExp, jExp) resetClients(p) } func checkIndices(t *testing.T, p *Pool, iExp, jExp int) { i, j := p.getStartIndices() require.Equal(t, [2]int{iExp, jExp}, [2]int{i, j}) } func resetClients(p *Pool) { resetClientsErrors(p) p.setStartIndices(0, 0) } func resetClientsErrors(p *Pool) { for _, group := range p.innerPools { for _, cl := range group.clients { node := cl.(*treeClientMock) node.err = false } } } func setErrors(p *Pool, nodes ...string) { setErrorsBase(p, true, nodes...) } func setNoErrors(p *Pool, nodes ...string) { setErrorsBase(p, false, nodes...) } func setErrorsBase(p *Pool, err bool, nodes ...string) { for _, group := range p.innerPools { for _, cl := range group.clients { node := cl.(*treeClientMock) if containsStr(nodes, node.address) { node.err = err } } } } func containsStr(list []string, item string) bool { for i := range list { if list[i] == item { return true } } return false } func makeClientMap(nodes []netmap.NodeInfo) map[uint64]client { res := make(map[uint64]client, len(nodes)) for _, node := range nodes { res[hrw.Hash(node.PublicKey())] = &treeClientMock{} } return res } func checkClientsPresence(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) { require.Len(t, p.clientMap, len(nodes)) for _, node := range nodes { require.NotNil(t, p.clientMap[hrw.Hash(node.PublicKey())]) } } func checkClientsUsage(t *testing.T, p *Pool, nodes ...netmap.NodeInfo) { for hash, cl := range p.clientMap { if containsHash(nodes, hash) { require.True(t, cl.(*treeClientMock).used) } else { require.False(t, cl.(*treeClientMock).used) } } } func setClientMapErrors(p *Pool, nodes ...netmap.NodeInfo) { for hash, cl := range p.clientMap { if containsHash(nodes, hash) { cl.(*treeClientMock).err = true } } } func containsHash(list []netmap.NodeInfo, hash uint64) bool { for i := range list { if hrw.Hash(list[i].PublicKey()) == hash { return true } } return false } func getPlacementPolicy(replicas uint32) (p netmap.PlacementPolicy) { var r netmap.ReplicaDescriptor r.SetNumberOfObjects(replicas) p.AddReplicas([]netmap.ReplicaDescriptor{r}...) return p } func getNodeInfo(key []byte) netmap.NodeInfo { var node netmap.NodeInfo node.SetPublicKey(key) return node }