package tree import ( "bytes" "context" "errors" "net" "runtime" "strconv" "testing" apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) type mockTreeServer struct { id int srv *grpc.Server lis net.Listener key *keys.PrivateKey healthy bool addCounter int } type mockNetmapSource struct { servers []*mockTreeServer policy string } func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) { nm := netmap.NetMap{} nodes := make([]netmap.NodeInfo, len(m.servers)) for i, server := range m.servers { ni := apinetmap.NodeInfo{} ni.SetAddresses(server.lis.Addr().String()) ni.SetPublicKey(server.key.PublicKey().Bytes()) err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo if err != nil { return nm, err } nodes[i].SetAttribute("id", strconv.Itoa(server.id)) } nm.SetNodes(nodes) return nm, nil } func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) { p := netmap.PlacementPolicy{} return p, p.DecodeString(m.policy) } func (m *mockTreeServer) Serve() { go m.srv.Serve(m.lis) } func (m *mockTreeServer) Stop() { m.srv.Stop() } func (m *mockTreeServer) Addr() string { return m.lis.Addr().String() } func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) { m.addCounter++ return &tree.AddResponse{}, nil } func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) { panic("implement me") } func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) { panic("implement me") } func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) { panic("implement me") } func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) { panic("implement me") } func (m *mockTreeServer) GetSubTree(*tree.GetSubTreeRequest, tree.TreeService_GetSubTreeServer) error { panic("implement me") } func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) { panic("implement me") } func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) { panic("implement me") } func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error { panic("implement me") } func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) { if m.healthy { return new(tree.HealthcheckResponse), nil } return nil, errors.New("not healthy") } func createTestServer(t *testing.T, id int) *mockTreeServer { lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) key, err := keys.NewPrivateKey() require.NoError(t, err) res := &mockTreeServer{ id: id, srv: grpc.NewServer(), lis: lis, key: key, healthy: true, } tree.RegisterTreeServiceServer(res.srv, res) return res } func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) { poolInitParams := InitParameters{} servers := make([]*mockTreeServer, n) for i := range servers { servers[i] = createTestServer(t, i) servers[i].healthy = true servers[i].Serve() poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1)) } source := &mockNetmapSource{ servers: servers, policy: p, } key, err := keys.NewPrivateKey() require.NoError(t, err) poolInitParams.SetKey(key) poolInitParams.SetNetMapInfoSource(source) cli, err := NewPool(poolInitParams) require.NoError(t, err) return cli, servers, source } func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) { res := make([]*mockTreeServer, len(servers)) snapshot, err := source.NetMapSnapshot(ctx) if err != nil { return nil, err } policy, err := source.PlacementPolicy(ctx, cnr) if err != nil { return nil, err } cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:]) if err != nil { return nil, err } priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:]) if err != nil { return nil, err } // find servers based on public key and store pointers in res index := 0 for i := range priorityNodes { for j := range priorityNodes[i] { key := priorityNodes[i][j].PublicKey() for k := range servers { if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) { res[index] = servers[k] index++ break } } } } return res, nil } func TestConnectionLeak(t *testing.T) { const ( numberOfNodes = 4 placementPolicy = "REP 2" ) // Initialize gRPC servers and create pool with netmap source treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy) for i := range servers { defer servers[i].Stop() } cnr := cidtest.ID() ctx := context.Background() // Make priority node for cnr unhealthy, so it is going to be redialled on every request sortedServers, err := sortServers(ctx, servers, source, cnr) require.NoError(t, err) sortedServers[0].healthy = false // Make RPC and check that pool switched to healthy server _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) require.NoError(t, err) require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy require.Equal(t, 1, sortedServers[1].addCounter) // healthy // Check that go routines are not leaked during multiple requests routinesBefore := runtime.NumGoroutine() for i := 0; i < 1000; i++ { _, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr}) require.NoError(t, err) } // not more than 1 extra goroutine is created due to async operations require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1) }