frostfs-sdk-go/pool/tree/pool_server_test.go
Alex Vanin 2b8329e026
All checks were successful
DCO / DCO (pull_request) Successful in 26s
Code generation / Generate proto (pull_request) Successful in 33s
Tests and linters / Tests (pull_request) Successful in 44s
Tests and linters / Lint (pull_request) Successful in 1m36s
[#336] pool/tree: Increase test coverage in TestStreamRetry
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-28 12:32:56 +03:00

345 lines
8.7 KiB
Go

package tree
import (
"bytes"
"context"
"errors"
"io"
"net"
"runtime"
"strconv"
"testing"
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
apitree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
type mockTreeServer struct {
id int
srv *grpc.Server
lis net.Listener
key *keys.PrivateKey
healthy bool
addCounter int
getSubTreeError error
getSubTreeResponses []*tree.GetSubTreeResponse_Body
getSubTreeCounter int
}
type mockNetmapSource struct {
servers []*mockTreeServer
policy string
}
func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
nm := netmap.NetMap{}
nodes := make([]netmap.NodeInfo, len(m.servers))
for i, server := range m.servers {
ni := apinetmap.NodeInfo{}
ni.SetAddresses(server.lis.Addr().String())
ni.SetPublicKey(server.key.PublicKey().Bytes())
err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo
if err != nil {
return nm, err
}
nodes[i].SetAttribute("id", strconv.Itoa(server.id))
}
nm.SetNodes(nodes)
return nm, nil
}
func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
p := netmap.PlacementPolicy{}
return p, p.DecodeString(m.policy)
}
func (m *mockTreeServer) Serve() {
go m.srv.Serve(m.lis)
}
func (m *mockTreeServer) Stop() {
m.srv.Stop()
}
func (m *mockTreeServer) Addr() string {
return m.lis.Addr().String()
}
func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) {
m.addCounter++
return &tree.AddResponse{}, nil
}
func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetSubTree(_ *tree.GetSubTreeRequest, s tree.TreeService_GetSubTreeServer) error {
m.getSubTreeCounter++
if m.getSubTreeError != nil {
return m.getSubTreeError
}
for i := range m.getSubTreeResponses {
if err := s.Send(&tree.GetSubTreeResponse{
Body: m.getSubTreeResponses[i],
}); err != nil {
return err
}
}
return nil
}
func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) {
panic("implement me")
}
func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error {
panic("implement me")
}
func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) {
if m.healthy {
return new(tree.HealthcheckResponse), nil
}
return nil, errors.New("not healthy")
}
func createTestServer(t *testing.T, id int) *mockTreeServer {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
key, err := keys.NewPrivateKey()
require.NoError(t, err)
res := &mockTreeServer{
id: id,
srv: grpc.NewServer(),
lis: lis,
key: key,
healthy: true,
}
tree.RegisterTreeServiceServer(res.srv, res)
return res
}
func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) {
poolInitParams := InitParameters{}
servers := make([]*mockTreeServer, n)
for i := range servers {
servers[i] = createTestServer(t, i)
servers[i].healthy = true
servers[i].Serve()
poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1))
}
source := &mockNetmapSource{
servers: servers,
policy: p,
}
key, err := keys.NewPrivateKey()
require.NoError(t, err)
poolInitParams.SetKey(key)
poolInitParams.SetNetMapInfoSource(source)
cli, err := NewPool(poolInitParams)
require.NoError(t, err)
return cli, servers, source
}
func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) {
res := make([]*mockTreeServer, len(servers))
snapshot, err := source.NetMapSnapshot(ctx)
if err != nil {
return nil, err
}
policy, err := source.PlacementPolicy(ctx, cnr)
if err != nil {
return nil, err
}
cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:])
if err != nil {
return nil, err
}
priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:])
if err != nil {
return nil, err
}
// find servers based on public key and store pointers in res
index := 0
for i := range priorityNodes {
for j := range priorityNodes[i] {
key := priorityNodes[i][j].PublicKey()
for k := range servers {
if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) {
res[index] = servers[k]
index++
break
}
}
}
}
return res, nil
}
func TestConnectionLeak(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
for i := range servers {
defer servers[i].Stop()
}
cnr := cidtest.ID()
ctx := context.Background()
// Make priority node for cnr unhealthy, so it is going to be redialled on every request
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
sortedServers[0].healthy = false
// Make RPC and check that pool switched to healthy server
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy
require.Equal(t, 1, sortedServers[1].addCounter) // healthy
// Check that go routines are not leaked during multiple requests
routinesBefore := runtime.NumGoroutine()
for i := 0; i < 1000; i++ {
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
require.NoError(t, err)
}
// not more than 1 extra goroutine is created due to async operations
require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1)
}
func TestStreamRetry(t *testing.T) {
const (
numberOfNodes = 4
placementPolicy = "REP 2"
)
expected := []*tree.GetSubTreeResponse_Body{
{
NodeId: []uint64{1},
},
{
NodeId: []uint64{2},
},
{
NodeId: []uint64{3},
},
}
// Initialize gRPC servers and create pool with netmap source
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
defer func() {
for i := range servers {
servers[i].Stop()
}
}()
cnr := cidtest.ID()
ctx := context.Background()
sortedServers, err := sortServers(ctx, servers, source, cnr)
require.NoError(t, err)
// Return expected response in last priority node, others return error
for i := range sortedServers {
if i == len(sortedServers)-1 {
sortedServers[i].getSubTreeResponses = expected
} else {
sortedServers[i].getSubTreeError = errors.New("tree not found")
}
}
t.Run("read all", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
data, err := reader.ReadAll()
require.NoError(t, err)
require.Len(t, data, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), data[i].GetNodeID())
}
})
t.Run("next", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
for i := range expected {
resp, err := reader.Next()
require.NoError(t, err)
require.Equal(t, expected[i].GetNodeId(), resp.GetNodeID())
}
_, err = reader.Next()
require.Error(t, io.EOF, err)
})
t.Run("read", func(t *testing.T) {
reader, err := treePool.GetSubTree(ctx, GetSubTreeParams{CID: cnr})
require.NoError(t, err)
buf := make([]*apitree.GetSubTreeResponseBody, len(expected))
_, err = reader.Read(buf)
require.NoError(t, err)
require.Len(t, buf, len(expected))
for i := range expected {
require.EqualValues(t, expected[i].GetNodeId(), buf[i].GetNodeID())
}
})
for i := range servers {
// check we retried every available node in the pool three times
require.Equal(t, 3, servers[i].getSubTreeCounter)
}
}