From 4ecbfb0edfe4ddfae9065f0a6da59cdea88b05dc Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 4 Feb 2025 21:12:40 +0300 Subject: [PATCH] [#331] pool: Add mocked test tree service and check goroutine leak Use real gRPC connection in new mocked tree service. Signed-off-by: Alex Vanin --- pool/tree/pool_server_test.go | 234 ++++++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 pool/tree/pool_server_test.go diff --git a/pool/tree/pool_server_test.go b/pool/tree/pool_server_test.go new file mode 100644 index 0000000..7e3447f --- /dev/null +++ b/pool/tree/pool_server_test.go @@ -0,0 +1,234 @@ +package tree + +import ( + "bytes" + "context" + "errors" + "net" + "runtime" + "strconv" + "testing" + + apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type mockTreeServer struct { + id int + srv *grpc.Server + lis net.Listener + key *keys.PrivateKey + + healthy bool + addCounter int +} + +type mockNetmapSource struct { + servers []*mockTreeServer + policy string +} + +func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) { + nm := netmap.NetMap{} + nodes := make([]netmap.NodeInfo, len(m.servers)) + for i, server := range m.servers { + ni := apinetmap.NodeInfo{} + ni.SetAddresses(server.lis.Addr().String()) + ni.SetPublicKey(server.key.PublicKey().Bytes()) + err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo + if err != nil { + return nm, err + } + nodes[i].SetAttribute("id", strconv.Itoa(server.id)) + } + nm.SetNodes(nodes) + return nm, nil +} + +func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) { + p := netmap.PlacementPolicy{} + return p, p.DecodeString(m.policy) +} + +func (m *mockTreeServer) Serve() { + go m.srv.Serve(m.lis) +} + +func (m *mockTreeServer) Stop() { + m.srv.Stop() +} + +func (m *mockTreeServer) Addr() string { + return m.lis.Addr().String() +} + +func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) { + m.addCounter++ + return &tree.AddResponse{}, nil +} + +func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) GetSubTree(*tree.GetSubTreeRequest, tree.TreeService_GetSubTreeServer) error { + panic("implement me") +} + +func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) { + panic("implement me") +} + +func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error { + panic("implement me") +} + +func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) { + if m.healthy { + return new(tree.HealthcheckResponse), nil + } + return nil, errors.New("not healthy") +} + +func createTestServer(t *testing.T, id int) *mockTreeServer { + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + key, err := keys.NewPrivateKey() + require.NoError(t, err) + + res := &mockTreeServer{ + id: id, + srv: grpc.NewServer(), + lis: lis, + key: key, + healthy: true, + } + + tree.RegisterTreeServiceServer(res.srv, res) + + return res +} + +func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) { + poolInitParams := InitParameters{} + + servers := make([]*mockTreeServer, n) + for i := range servers { + servers[i] = createTestServer(t, i) + servers[i].healthy = true + servers[i].Serve() + poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1)) + } + + source := &mockNetmapSource{ + servers: servers, + policy: p, + } + + key, err := keys.NewPrivateKey() + require.NoError(t, err) + poolInitParams.SetKey(key) + poolInitParams.SetNetMapInfoSource(source) + + cli, err := NewPool(poolInitParams) + require.NoError(t, err) + + return cli, servers, source +} + +func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) { + res := make([]*mockTreeServer, len(servers)) + snapshot, err := source.NetMapSnapshot(ctx) + if err != nil { + return nil, err + } + + policy, err := source.PlacementPolicy(ctx, cnr) + if err != nil { + return nil, err + } + + cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:]) + if err != nil { + return nil, err + } + + priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:]) + if err != nil { + return nil, err + } + + // find servers based on public key and store pointers in res + for i := range priorityNodes { + for j := range priorityNodes[i] { + key := priorityNodes[i][j].PublicKey() + for k := range servers { + if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) { + res[i+j] = servers[k] + } + } + } + } + + return res, nil +} + +func TestConnectionLeak(t *testing.T) { + const ( + numberOfNodes = 4 + placementPolicy = "REP 2" + ) + + // Initialize gRPC servers and create pool with netmap source + treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy) + for i := range servers { + defer servers[i].Stop() + } + + cnr := cidtest.ID() + ctx := context.Background() + + // Make priority node for cnr unhealthy, so it is going to be redialled on every request + sortedServers, err := sortServers(ctx, servers, source, cnr) + require.NoError(t, err) + sortedServers[0].healthy = false + + // Make RPC and check that pool switched to healthy server + _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) + require.NoError(t, err) + require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy + require.Equal(t, 1, sortedServers[1].addCounter) // healthy + + // Check that go routines are not leaked during multiple requests + routinesBefore := runtime.NumGoroutine() + for i := 0; i < 1000; i++ { + _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) + require.NoError(t, err) + } + // not more than 1 extra goroutine is created due to async operations + require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1) +}