package tree import ( "bytes" "context" "errors" "io" "net" "runtime" "strconv" "testing" apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap" apitree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) type mockTreeServer struct { id int srv *grpc.Server lis net.Listener key *keys.PrivateKey healthy bool addCounter int getSubTreeError error getSubTreeResponses []*tree.GetSubTreeResponse_Body getSubTreeCounter int } type mockNetmapSource struct { servers []*mockTreeServer policy string } func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) { nm := netmap.NetMap{} nodes := make([]netmap.NodeInfo, len(m.servers)) for i, server := range m.servers { ni := apinetmap.NodeInfo{} ni.SetAddresses(server.lis.Addr().String()) ni.SetPublicKey(server.key.PublicKey().Bytes()) err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo if err != nil { return nm, err } nodes[i].SetAttribute("id", strconv.Itoa(server.id)) } nm.SetNodes(nodes) return nm, nil } func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) { p := netmap.PlacementPolicy{} return p, p.DecodeString(m.policy) } func (m *mockTreeServer) Serve() { go m.srv.Serve(m.lis) } func (m *mockTreeServer) Stop() { m.srv.Stop() } func (m *mockTreeServer) Addr() string { return m.lis.Addr().String() } func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) { m.addCounter++ return &tree.AddResponse{}, nil } func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) { panic("implement me") } func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) { panic("implement me") } func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) { panic("implement me") } func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) { panic("implement me") } func (m *mockTreeServer) GetSubTree(_ *tree.GetSubTreeRequest, s tree.TreeService_GetSubTreeServer) error { m.getSubTreeCounter++ if m.getSubTreeError != nil { return m.getSubTreeError } for i := range m.getSubTreeResponses { if err := s.Send(&tree.GetSubTreeResponse{ Body: m.getSubTreeResponses[i], }); err != nil { return err } } return nil } func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) { panic("implement me") } func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) { panic("implement me") } func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error { panic("implement me") } func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) { if m.healthy { return new(tree.HealthcheckResponse), nil } return nil, errors.New("not healthy") } func createTestServer(t *testing.T, id int) *mockTreeServer { lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) key, err := keys.NewPrivateKey() require.NoError(t, err) res := &mockTreeServer{ id: id, srv: grpc.NewServer(), lis: lis, key: key, healthy: true, } tree.RegisterTreeServiceServer(res.srv, res) return res } func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) { poolInitParams := InitParameters{} servers := make([]*mockTreeServer, n) for i := range servers { servers[i] = createTestServer(t, i) servers[i].healthy = true servers[i].Serve() poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1)) } source := &mockNetmapSource{ servers: servers, policy: p, } key, err := keys.NewPrivateKey() require.NoError(t, err) poolInitParams.SetKey(key) poolInitParams.SetNetMapInfoSource(source) cli, err := NewPool(poolInitParams) require.NoError(t, err) return cli, servers, source } func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) { res := make([]*mockTreeServer, len(servers)) snapshot, err := source.NetMapSnapshot(ctx) if err != nil { return nil, err } policy, err := source.PlacementPolicy(ctx, cnr) if err != nil { return nil, err } cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:]) if err != nil { return nil, err } priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:]) if err != nil { return nil, err } // find servers based on public key and store pointers in res index := 0 for i := range priorityNodes { for j := range priorityNodes[i] { key := priorityNodes[i][j].PublicKey() for k := range servers { if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) { res[index] = servers[k] index++ break } } } } return res, nil } func TestConnectionLeak(t *testing.T) { const ( numberOfNodes = 4 placementPolicy = "REP 2" ) // Initialize gRPC servers and create pool with netmap source treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy) for i := range servers { defer servers[i].Stop() } cnr := cidtest.ID() ctx := context.Background() // Make priority node for cnr unhealthy, so it is going to be redialled on every request sortedServers, err := sortServers(ctx, servers, source, cnr) require.NoError(t, err) sortedServers[0].healthy = false // Make RPC and check that pool switched to healthy server _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) require.NoError(t, err) require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy require.Equal(t, 1, sortedServers[1].addCounter) // healthy // Check that go routines are not leaked during multiple requests routinesBefore := runtime.NumGoroutine() for i := 0; i < 1000; i++ { _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) require.NoError(t, err) } // not more than 1 extra goroutine is created due to async operations require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1) } func TestStreamRetry(t *testing.T) { const ( numberOfNodes = 4 placementPolicy = "REP 2" ) expected := []*tree.GetSubTreeResponse_Body{ { NodeId: []uint64{1}, }, { NodeId: []uint64{2}, }, { NodeId: []uint64{3}, }, } // Initialize gRPC servers and create pool with netmap source treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy) defer func() { for i := range servers { servers[i].Stop() } }() cnr := cidtest.ID() ctx := context.Background() sortedServers, err := sortServers(ctx, servers, source, cnr) require.NoError(t, err) // Return expected response in last priority node, others return error for i := range sortedServers { if i == len(sortedServers)-1 { sortedServers[i].getSubTreeResponses = expected } else { sortedServers[i].getSubTreeError = errors.New("tree not found") } } t.Run("read all", func(t *testing.T) { reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr}) require.NoError(t, err) data, err := reader.ReadAll() require.NoError(t, err) require.Len(t, data, len(expected)) for i := range expected { require.EqualValues(t, expected[i].GetNodeId(), data[i].GetNodeID()) } }) t.Run("next", func(t *testing.T) { reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr}) require.NoError(t, err) for i := range expected { resp, err := reader.Next() require.NoError(t, err) require.Equal(t, expected[i].GetNodeId(), resp.GetNodeID()) } _, err = reader.Next() require.Error(t, io.EOF, err) }) t.Run("read", func(t *testing.T) { reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr}) require.NoError(t, err) buf := make([]*apitree.GetSubTreeResponseBody, len(expected)) _, err = reader.Read(buf) require.NoError(t, err) require.Len(t, buf, len(expected)) for i := range expected { require.EqualValues(t, expected[i].GetNodeId(), buf[i].GetNodeID()) } }) for i := range servers { // check we retried every available node in the pool three times require.Equal(t, 3, servers[i].getSubTreeCounter) } }