Avoid connection leak in tree pool with netmap support #331
2 changed files with 247 additions and 0 deletions
|
@ -1032,6 +1032,16 @@ func (p *Pool) getNewTreeClient(ctx context.Context, node netmap.NodeInfo) (*tre
|
||||||
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
newTreeCl := newTreeClient(addr.URIAddr(), p.dialOptions, p.nodeDialTimeout, p.streamTimeout)
|
||||||
if err = newTreeCl.dial(ctx); err != nil {
|
if err = newTreeCl.dial(ctx); err != nil {
|
||||||
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
p.log(zap.WarnLevel, "failed to dial tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
||||||
|
|
||||||
|
// We have to close connection here after failed `dial()`.
|
||||||
|
// This is NOT necessary in object pool and regular tree pool without netmap support, because:
|
||||||
|
// - object pool uses SDK object client which closes connection during `dial()` call by itself,
|
||||||
|
// - regular tree pool is going to reuse connection by calling `redialIfNecessary()`.
|
||||||
|
// Tree pool with netmap support does not operate with background goroutine, so we have to close connection immediately.
|
||||||
|
if err = newTreeCl.close(); err != nil {
|
||||||
|
p.log(zap.WarnLevel, "failed to close recently dialed tree client", zap.String("address", addr.URIAddr()), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
237
pool/tree/pool_server_test.go
Normal file
237
pool/tree/pool_server_test.go
Normal file
|
@ -0,0 +1,237 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net"
|
||||||
|
"runtime"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
apinetmap "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
tree "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree/service"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockTreeServer struct {
|
||||||
|
id int
|
||||||
|
srv *grpc.Server
|
||||||
|
lis net.Listener
|
||||||
|
key *keys.PrivateKey
|
||||||
|
|
||||||
|
healthy bool
|
||||||
|
addCounter int
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockNetmapSource struct {
|
||||||
|
servers []*mockTreeServer
|
||||||
|
policy string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockNetmapSource) NetMapSnapshot(context.Context) (netmap.NetMap, error) {
|
||||||
|
nm := netmap.NetMap{}
|
||||||
|
nodes := make([]netmap.NodeInfo, len(m.servers))
|
||||||
|
for i, server := range m.servers {
|
||||||
|
ni := apinetmap.NodeInfo{}
|
||||||
|
ni.SetAddresses(server.lis.Addr().String())
|
||||||
|
ni.SetPublicKey(server.key.PublicKey().Bytes())
|
||||||
|
err := nodes[i].ReadFromV2(ni) // no other way to set address field in netmap.NodeInfo
|
||||||
|
|||||||
|
if err != nil {
|
||||||
|
return nm, err
|
||||||
|
}
|
||||||
|
nodes[i].SetAttribute("id", strconv.Itoa(server.id))
|
||||||
|
}
|
||||||
|
nm.SetNodes(nodes)
|
||||||
|
return nm, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockNetmapSource) PlacementPolicy(context.Context, cid.ID) (netmap.PlacementPolicy, error) {
|
||||||
|
p := netmap.PlacementPolicy{}
|
||||||
|
return p, p.DecodeString(m.policy)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Serve() {
|
||||||
|
go m.srv.Serve(m.lis)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Stop() {
|
||||||
|
m.srv.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Addr() string {
|
||||||
|
return m.lis.Addr().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Add(context.Context, *tree.AddRequest) (*tree.AddResponse, error) {
|
||||||
|
m.addCounter++
|
||||||
|
return &tree.AddResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) AddByPath(context.Context, *tree.AddByPathRequest) (*tree.AddByPathResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Remove(context.Context, *tree.RemoveRequest) (*tree.RemoveResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Move(context.Context, *tree.MoveRequest) (*tree.MoveResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) GetNodeByPath(context.Context, *tree.GetNodeByPathRequest) (*tree.GetNodeByPathResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) GetSubTree(*tree.GetSubTreeRequest, tree.TreeService_GetSubTreeServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) TreeList(context.Context, *tree.TreeListRequest) (*tree.TreeListResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Apply(context.Context, *tree.ApplyRequest) (*tree.ApplyResponse, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) GetOpLog(*tree.GetOpLogRequest, tree.TreeService_GetOpLogServer) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTreeServer) Healthcheck(context.Context, *tree.HealthcheckRequest) (*tree.HealthcheckResponse, error) {
|
||||||
|
if m.healthy {
|
||||||
|
return new(tree.HealthcheckResponse), nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("not healthy")
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTestServer(t *testing.T, id int) *mockTreeServer {
|
||||||
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res := &mockTreeServer{
|
||||||
|
id: id,
|
||||||
|
srv: grpc.NewServer(),
|
||||||
|
lis: lis,
|
||||||
|
key: key,
|
||||||
|
healthy: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
tree.RegisterTreeServiceServer(res.srv, res)
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePoolWithNetmapSource(t *testing.T, n int, p string) (*Pool, []*mockTreeServer, *mockNetmapSource) {
|
||||||
|
poolInitParams := InitParameters{}
|
||||||
|
|
||||||
|
servers := make([]*mockTreeServer, n)
|
||||||
|
for i := range servers {
|
||||||
|
servers[i] = createTestServer(t, i)
|
||||||
|
servers[i].healthy = true
|
||||||
|
servers[i].Serve()
|
||||||
|
poolInitParams.AddNode(pool.NewNodeParam(1, servers[i].Addr(), 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
source := &mockNetmapSource{
|
||||||
|
servers: servers,
|
||||||
|
policy: p,
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := keys.NewPrivateKey()
|
||||||
|
require.NoError(t, err)
|
||||||
|
poolInitParams.SetKey(key)
|
||||||
|
poolInitParams.SetNetMapInfoSource(source)
|
||||||
|
|
||||||
|
cli, err := NewPool(poolInitParams)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return cli, servers, source
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortServers(ctx context.Context, servers []*mockTreeServer, source *mockNetmapSource, cnr cid.ID) ([]*mockTreeServer, error) {
|
||||||
|
res := make([]*mockTreeServer, len(servers))
|
||||||
|
snapshot, err := source.NetMapSnapshot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
policy, err := source.PlacementPolicy(ctx, cnr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrNodes, err := snapshot.ContainerNodes(policy, cnr[:])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
priorityNodes, err := snapshot.PlacementVectors(cnrNodes, cnr[:])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// find servers based on public key and store pointers in res
|
||||||
|
index := 0
|
||||||
|
for i := range priorityNodes {
|
||||||
|
for j := range priorityNodes[i] {
|
||||||
|
key := priorityNodes[i][j].PublicKey()
|
||||||
|
for k := range servers {
|
||||||
|
if bytes.Equal(servers[k].key.PublicKey().Bytes(), key) {
|
||||||
|
res[index] = servers[k]
|
||||||
|
index++
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConnectionLeak(t *testing.T) {
|
||||||
|
const (
|
||||||
|
numberOfNodes = 4
|
||||||
|
placementPolicy = "REP 2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Initialize gRPC servers and create pool with netmap source
|
||||||
|
treePool, servers, source := preparePoolWithNetmapSource(t, numberOfNodes, placementPolicy)
|
||||||
|
for i := range servers {
|
||||||
|
defer servers[i].Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Make priority node for cnr unhealthy, so it is going to be redialled on every request
|
||||||
|
sortedServers, err := sortServers(ctx, servers, source, cnr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
sortedServers[0].healthy = false
|
||||||
|
|
||||||
|
// Make RPC and check that pool switched to healthy server
|
||||||
|
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 0, sortedServers[0].addCounter) // unhealthy
|
||||||
|
require.Equal(t, 1, sortedServers[1].addCounter) // healthy
|
||||||
|
|
||||||
|
// Check that go routines are not leaked during multiple requests
|
||||||
|
routinesBefore := runtime.NumGoroutine()
|
||||||
alexvanin
commented
I will appreciate any other way to check the leak. I had no better idea than checking number of routines directly. gRPC server does not provide any useful data about connections as far as I understand. I will appreciate any other way to check the leak. I had no better idea than checking number of routines directly. gRPC server does not provide any useful data about connections as far as I understand.
aarifullin
commented
https://github.com/uber-go/goleak But I don't think we are going to introduce a new import. I'd use the same idea like you with https://github.com/uber-go/goleak
But I don't think we are going to introduce a new import. I'd use the same idea like you with `runtime` :)
|
|||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
_, err = treePool.AddNode(context.Background(), AddNodeParams{CID: cnr})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
// not more than 1 extra goroutine is created due to async operations
|
||||||
|
require.LessOrEqual(t, runtime.NumGoroutine()-routinesBefore, 1)
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue
Let me know if there any