[#165] pool: drop builder

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-11 14:45:36 +03:00 committed by Alex Vanin
parent aeb4ac638a
commit f5cabe26cb
3 changed files with 186 additions and 168 deletions

View file

@ -50,8 +50,8 @@ type Client interface {
SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error) SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error)
} }
// BuilderOptions contains options used to build connection Pool. // InitParameters contains options used to create connection Pool.
type BuilderOptions struct { type InitParameters struct {
Key *ecdsa.PrivateKey Key *ecdsa.PrivateKey
Logger *zap.Logger Logger *zap.Logger
NodeConnectionTimeout time.Duration NodeConnectionTimeout time.Duration
@ -59,26 +59,28 @@ type BuilderOptions struct {
ClientRebalanceInterval time.Duration ClientRebalanceInterval time.Duration
SessionTokenThreshold time.Duration SessionTokenThreshold time.Duration
SessionExpirationDuration uint64 SessionExpirationDuration uint64
nodesParams []*NodesParam NodeParams []NodeParam
clientBuilder func(endpoint string) (Client, error) clientBuilder func(endpoint string) (Client, error)
} }
type NodesParam struct { type rebalanceParameters struct {
nodesParams []*nodesParam
nodeRequestTimeout time.Duration
clientRebalanceInterval time.Duration
sessionExpirationDuration uint64
}
type nodesParam struct {
priority int priority int
addresses []string addresses []string
weights []float64 weights []float64
} }
type NodeParam struct { type NodeParam struct {
priority int Priority int
address string Address string
weight float64 Weight float64
}
// Builder is an interim structure used to collect node addresses/weights and
// build connection Pool subsequently.
type Builder struct {
nodeParams []NodeParam
} }
// ContainerPollingParams contains parameters used in polling is a container created or not. // ContainerPollingParams contains parameters used in polling is a container created or not.
@ -95,45 +97,6 @@ func DefaultPollingParams() *ContainerPollingParams {
} }
} }
// AddNode adds address/weight pair to node PoolBuilder list.
func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder {
pb.nodeParams = append(pb.nodeParams, NodeParam{
address: address,
priority: priority,
weight: weight,
})
return pb
}
// Build creates new Pool based on current PoolBuilder state and options.
func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (*Pool, error) {
if len(pb.nodeParams) == 0 {
return nil, errors.New("no NeoFS peers configured")
}
nodesParams := make(map[int]*NodesParam)
for _, param := range pb.nodeParams {
nodes, ok := nodesParams[param.priority]
if !ok {
nodes = &NodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParams[param.priority] = nodes
}
for _, nodes := range nodesParams {
nodes.weights = adjustWeights(nodes.weights)
options.nodesParams = append(options.nodesParams, nodes)
}
sort.Slice(options.nodesParams, func(i, j int) bool {
return options.nodesParams[i].priority < options.nodesParams[j].priority
})
return newPool(ctx, options)
}
type clientPack struct { type clientPack struct {
client Client client Client
healthy bool healthy bool
@ -217,46 +180,34 @@ const (
defaultSessionTokenExpirationDuration = 100 // in blocks defaultSessionTokenExpirationDuration = 100 // in blocks
defaultSessionTokenThreshold = 5 * time.Second defaultSessionTokenThreshold = 5 * time.Second
defaultRebalanceInterval = 25 * time.Second
defaultRequestTimeout = 4 * time.Second
) )
func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) { // NewPool create connection pool using parameters.
func NewPool(ctx context.Context, options InitParameters) (*Pool, error) {
if options.Key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.NodeParams)
if err != nil {
return nil, err
}
fillDefaultInitParams(&options)
cache, err := NewCache() cache, err := NewCache()
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't create cache: %w", err) return nil, fmt.Errorf("couldn't create cache: %w", err)
} }
if options.SessionExpirationDuration == 0 {
options.SessionExpirationDuration = defaultSessionTokenExpirationDuration
}
if options.SessionTokenThreshold <= 0 {
options.SessionTokenThreshold = defaultSessionTokenThreshold
}
ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey)
inner := make([]*innerPool, len(options.nodesParams)) inner := make([]*innerPool, len(nodesParams))
var atLeastOneHealthy bool var atLeastOneHealthy bool
if options.clientBuilder == nil { for i, params := range nodesParams {
options.clientBuilder = func(addr string) (Client, error) {
var c client.Client
var prmInit client.PrmInit
prmInit.ResolveNeoFSFailures()
prmInit.SetDefaultPrivateKey(*options.Key)
c.Init(prmInit)
var prmDial client.PrmDial
prmDial.SetServerURI(addr)
prmDial.SetTimeout(options.NodeConnectionTimeout)
return &c, c.Dial(prmDial)
}
}
for i, params := range options.nodesParams {
clientPacks := make([]*clientPack, len(params.weights)) clientPacks := make([]*clientPack, len(params.weights))
for j, addr := range params.addresses { for j, addr := range params.addresses {
c, err := options.clientBuilder(addr) c, err := options.clientBuilder(addr)
@ -267,7 +218,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) {
cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration)
if err != nil && options.Logger != nil { if err != nil && options.Logger != nil {
options.Logger.Warn("failed to create neofs session token for client", options.Logger.Warn("failed to create neofs session token for client",
zap.String("address", addr), zap.String("Address", addr),
zap.Error(err)) zap.Error(err))
} else if err == nil { } else if err == nil {
healthy, atLeastOneHealthy = true, true healthy, atLeastOneHealthy = true, true
@ -300,12 +251,85 @@ func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) {
stokenDuration: options.SessionExpirationDuration, stokenDuration: options.SessionExpirationDuration,
stokenThreshold: options.SessionTokenThreshold, stokenThreshold: options.SessionTokenThreshold,
} }
go startRebalance(ctx, pool, options)
rebalanceParams := rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.NodeRequestTimeout,
clientRebalanceInterval: options.ClientRebalanceInterval,
sessionExpirationDuration: options.SessionExpirationDuration,
}
go startRebalance(ctx, pool, rebalanceParams)
return pool, nil return pool, nil
} }
func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) { func fillDefaultInitParams(params *InitParameters) {
ticker := time.NewTimer(options.ClientRebalanceInterval) if params.SessionExpirationDuration == 0 {
params.SessionExpirationDuration = defaultSessionTokenExpirationDuration
}
if params.SessionTokenThreshold <= 0 {
params.SessionTokenThreshold = defaultSessionTokenThreshold
}
if params.ClientRebalanceInterval <= 0 {
params.ClientRebalanceInterval = defaultRebalanceInterval
}
if params.NodeRequestTimeout <= 0 {
params.NodeRequestTimeout = defaultRequestTimeout
}
if params.clientBuilder == nil {
params.clientBuilder = func(addr string) (Client, error) {
var c client.Client
var prmInit client.PrmInit
prmInit.ResolveNeoFSFailures()
prmInit.SetDefaultPrivateKey(*params.Key)
c.Init(prmInit)
var prmDial client.PrmDial
prmDial.SetServerURI(addr)
prmDial.SetTimeout(params.NodeConnectionTimeout)
return &c, c.Dial(prmDial)
}
}
}
func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
if len(nodeParams) == 0 {
return nil, errors.New("no NeoFS peers configured")
}
nodesParamsMap := make(map[int]*nodesParam)
for _, param := range nodeParams {
nodes, ok := nodesParamsMap[param.Priority]
if !ok {
nodes = &nodesParam{priority: param.Priority}
}
nodes.addresses = append(nodes.addresses, param.Address)
nodes.weights = append(nodes.weights, param.Weight)
nodesParamsMap[param.Priority] = nodes
}
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
for _, nodes := range nodesParamsMap {
nodes.weights = adjustWeights(nodes.weights)
nodesParams = append(nodesParams, nodes)
}
sort.Slice(nodesParams, func(i, j int) bool {
return nodesParams[i].priority < nodesParams[j].priority
})
return nodesParams, nil
}
func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) {
ticker := time.NewTimer(options.clientRebalanceInterval)
buffers := make([][]float64, len(options.nodesParams)) buffers := make([][]float64, len(options.nodesParams))
for i, params := range options.nodesParams { for i, params := range options.nodesParams {
buffers[i] = make([]float64, len(params.weights)) buffers[i] = make([]float64, len(params.weights))
@ -318,12 +342,12 @@ func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) {
return return
case <-ticker.C: case <-ticker.C:
updateNodesHealth(ctx, p, options, buffers) updateNodesHealth(ctx, p, options, buffers)
ticker.Reset(options.ClientRebalanceInterval) ticker.Reset(options.clientRebalanceInterval)
} }
} }
} }
func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, buffers [][]float64) { func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters, buffers [][]float64) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i, inner := range p.innerPools { for i, inner := range p.innerPools {
wg.Add(1) wg.Add(1)
@ -337,7 +361,7 @@ func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, bu
wg.Wait() wg.Wait()
} }
func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *BuilderOptions, bufferWeights []float64) { func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options rebalanceParameters, bufferWeights []float64) {
if i > len(pool.innerPools)-1 { if i > len(pool.innerPools)-1 {
return return
} }
@ -353,7 +377,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *Bui
go func(j int, cli Client) { go func(j int, cli Client) {
defer wg.Done() defer wg.Done()
ok := true ok := true
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c() defer c()
if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil { if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil {
@ -367,7 +391,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *Bui
if ok { if ok {
bufferWeights[j] = options.nodesParams[i].weights[j] bufferWeights[j] = options.nodesParams[i].weights[j]
if !cp.healthy { if !cp.healthy {
cliRes, err := createSessionTokenForDuration(ctx, cli, options.SessionExpirationDuration) cliRes, err := createSessionTokenForDuration(ctx, cli, options.sessionExpirationDuration)
if err != nil { if err != nil {
ok = false ok = false
bufferWeights[j] = 0 bufferWeights[j] = 0

View file

@ -28,15 +28,13 @@ func TestBuildPoolClientFailed(t *testing.T) {
return nil, fmt.Errorf("error") return nil, fmt.Errorf("error")
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
_, err := pb.Build(context.TODO(), opts) _, err := NewPool(context.TODO(), opts)
require.Error(t, err) require.Error(t, err)
} }
@ -54,15 +52,13 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
_, err := pb.Build(context.TODO(), opts) _, err := NewPool(context.TODO(), opts)
require.Error(t, err) require.Error(t, err)
} }
@ -110,20 +106,20 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
return mockClient2, nil return mockClient2, nil
} }
pb := new(Builder)
pb.AddNode("peer0", 9, 1)
pb.AddNode("peer1", 1, 1)
log, err := zap.NewProduction() log, err := zap.NewProduction()
require.NoError(t, err) require.NoError(t, err)
opts := &BuilderOptions{ opts := InitParameters{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
ClientRebalanceInterval: 1000 * time.Millisecond, ClientRebalanceInterval: 1000 * time.Millisecond,
Logger: log, Logger: log,
NodeParams: []NodeParam{
{9, "peer0", 1},
{1, "peer1", 1},
},
} }
clientPool, err := pb.Build(context.TODO(), opts) clientPool, err := NewPool(context.TODO(), opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(clientPool.Close) t.Cleanup(clientPool.Close)
@ -136,11 +132,10 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
} }
func TestBuildPoolZeroNodes(t *testing.T) { func TestBuildPoolZeroNodes(t *testing.T) {
pb := new(Builder) opts := InitParameters{
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
} }
_, err := pb.Build(context.TODO(), opts) _, err := NewPool(context.TODO(), opts)
require.Error(t, err) require.Error(t, err)
} }
@ -162,15 +157,13 @@ func TestOneNode(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := pb.Build(context.Background(), opts) pool, err := NewPool(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -200,16 +193,16 @@ func TestTwoNodes(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
NodeParams: []NodeParam{
{1, "peer0", 1},
{1, "peer1", 1},
},
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
pool, err := pb.Build(context.Background(), opts) pool, err := NewPool(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -256,17 +249,18 @@ func TestOneOfTwoFailed(t *testing.T) {
return mockClient2, nil return mockClient2, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 9, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, NodeParams: []NodeParam{
{1, "peer0", 1},
{9, "peer1", 1},
},
ClientRebalanceInterval: 200 * time.Millisecond, ClientRebalanceInterval: 200 * time.Millisecond,
clientBuilder: clientBuilder,
} }
pool, err := pb.Build(context.Background(), opts) pool, err := NewPool(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -292,17 +286,17 @@ func TestTwoFailed(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, NodeParams: []NodeParam{
{1, "peer0", 1},
{1, "peer1", 1},
},
ClientRebalanceInterval: 200 * time.Millisecond, ClientRebalanceInterval: 200 * time.Millisecond,
clientBuilder: clientBuilder,
} }
pool, err := pb.Build(context.Background(), opts) pool, err := NewPool(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -336,19 +330,19 @@ func TestSessionCache(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, NodeParams: []NodeParam{
{1, "peer0", 1},
},
ClientRebalanceInterval: 30 * time.Second, ClientRebalanceInterval: 30 * time.Second,
clientBuilder: clientBuilder,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := pb.Build(ctx, opts) pool, err := NewPool(ctx, opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -408,20 +402,20 @@ func TestPriority(t *testing.T) {
return mockClient2, nil return mockClient2, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
pb.AddNode("peer1", 2, 100)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, NodeParams: []NodeParam{
{1, "peer0", 1},
{2, "peer1", 100},
},
ClientRebalanceInterval: 1500 * time.Millisecond, ClientRebalanceInterval: 1500 * time.Millisecond,
clientBuilder: clientBuilder,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := pb.Build(ctx, opts) pool, err := NewPool(ctx, opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(pool.Close) t.Cleanup(pool.Close)
@ -463,19 +457,19 @@ func TestSessionCacheWithKey(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
clientBuilder: clientBuilder, NodeParams: []NodeParam{
{1, "peer0", 1},
},
ClientRebalanceInterval: 30 * time.Second, ClientRebalanceInterval: 30 * time.Second,
clientBuilder: clientBuilder,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
pool, err := pb.Build(ctx, opts) pool, err := NewPool(ctx, opts)
require.NoError(t, err) require.NoError(t, err)
// cache must contain session token // cache must contain session token
@ -507,18 +501,18 @@ func TestSessionTokenOwner(t *testing.T) {
return mockClient, nil return mockClient, nil
} }
pb := new(Builder) opts := InitParameters{
pb.AddNode("peer0", 1, 1)
opts := &BuilderOptions{
Key: newPrivateKey(t), Key: newPrivateKey(t),
NodeParams: []NodeParam{
{1, "peer0", 1},
},
clientBuilder: clientBuilder, clientBuilder: clientBuilder,
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
p, err := pb.Build(ctx, opts) p, err := NewPool(ctx, opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(p.Close) t.Cleanup(p.Close)

View file

@ -68,7 +68,7 @@ func TestHealthyReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )
@ -118,7 +118,7 @@ func TestHealthyNoReweight(t *testing.T) {
var ( var (
weights = []float64{0.9, 0.1} weights = []float64{0.9, 0.1}
names = []string{"node0", "node1"} names = []string{"node0", "node1"}
options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}}
buffer = make([]float64, len(weights)) buffer = make([]float64, len(weights))
) )