diff --git a/pool/pool.go b/pool/pool.go index 471c97c1..a6480934 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -50,8 +50,8 @@ type Client interface { SessionCreate(context.Context, client.PrmSessionCreate) (*client.ResSessionCreate, error) } -// BuilderOptions contains options used to build connection Pool. -type BuilderOptions struct { +// InitParameters contains options used to create connection Pool. +type InitParameters struct { Key *ecdsa.PrivateKey Logger *zap.Logger NodeConnectionTimeout time.Duration @@ -59,26 +59,28 @@ type BuilderOptions struct { ClientRebalanceInterval time.Duration SessionTokenThreshold time.Duration SessionExpirationDuration uint64 - nodesParams []*NodesParam - clientBuilder func(endpoint string) (Client, error) + NodeParams []NodeParam + + clientBuilder func(endpoint string) (Client, error) } -type NodesParam struct { +type rebalanceParameters struct { + nodesParams []*nodesParam + nodeRequestTimeout time.Duration + clientRebalanceInterval time.Duration + sessionExpirationDuration uint64 +} + +type nodesParam struct { priority int addresses []string weights []float64 } type NodeParam struct { - priority int - address string - weight float64 -} - -// Builder is an interim structure used to collect node addresses/weights and -// build connection Pool subsequently. -type Builder struct { - nodeParams []NodeParam + Priority int + Address string + Weight float64 } // ContainerPollingParams contains parameters used in polling is a container created or not. @@ -95,45 +97,6 @@ func DefaultPollingParams() *ContainerPollingParams { } } -// AddNode adds address/weight pair to node PoolBuilder list. -func (pb *Builder) AddNode(address string, priority int, weight float64) *Builder { - pb.nodeParams = append(pb.nodeParams, NodeParam{ - address: address, - priority: priority, - weight: weight, - }) - return pb -} - -// Build creates new Pool based on current PoolBuilder state and options. -func (pb *Builder) Build(ctx context.Context, options *BuilderOptions) (*Pool, error) { - if len(pb.nodeParams) == 0 { - return nil, errors.New("no NeoFS peers configured") - } - - nodesParams := make(map[int]*NodesParam) - for _, param := range pb.nodeParams { - nodes, ok := nodesParams[param.priority] - if !ok { - nodes = &NodesParam{priority: param.priority} - } - nodes.addresses = append(nodes.addresses, param.address) - nodes.weights = append(nodes.weights, param.weight) - nodesParams[param.priority] = nodes - } - - for _, nodes := range nodesParams { - nodes.weights = adjustWeights(nodes.weights) - options.nodesParams = append(options.nodesParams, nodes) - } - - sort.Slice(options.nodesParams, func(i, j int) bool { - return options.nodesParams[i].priority < options.nodesParams[j].priority - }) - - return newPool(ctx, options) -} - type clientPack struct { client Client healthy bool @@ -217,46 +180,34 @@ const ( defaultSessionTokenExpirationDuration = 100 // in blocks defaultSessionTokenThreshold = 5 * time.Second + defaultRebalanceInterval = 25 * time.Second + defaultRequestTimeout = 4 * time.Second ) -func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) { +// NewPool create connection pool using parameters. +func NewPool(ctx context.Context, options InitParameters) (*Pool, error) { + if options.Key == nil { + return nil, fmt.Errorf("missed required parameter 'Key'") + } + + nodesParams, err := adjustNodeParams(options.NodeParams) + if err != nil { + return nil, err + } + + fillDefaultInitParams(&options) + cache, err := NewCache() if err != nil { return nil, fmt.Errorf("couldn't create cache: %w", err) } - if options.SessionExpirationDuration == 0 { - options.SessionExpirationDuration = defaultSessionTokenExpirationDuration - } - - if options.SessionTokenThreshold <= 0 { - options.SessionTokenThreshold = defaultSessionTokenThreshold - } - ownerID := owner.NewIDFromPublicKey(&options.Key.PublicKey) - inner := make([]*innerPool, len(options.nodesParams)) + inner := make([]*innerPool, len(nodesParams)) var atLeastOneHealthy bool - if options.clientBuilder == nil { - options.clientBuilder = func(addr string) (Client, error) { - var c client.Client - - var prmInit client.PrmInit - prmInit.ResolveNeoFSFailures() - prmInit.SetDefaultPrivateKey(*options.Key) - - c.Init(prmInit) - - var prmDial client.PrmDial - prmDial.SetServerURI(addr) - prmDial.SetTimeout(options.NodeConnectionTimeout) - - return &c, c.Dial(prmDial) - } - } - - for i, params := range options.nodesParams { + for i, params := range nodesParams { clientPacks := make([]*clientPack, len(params.weights)) for j, addr := range params.addresses { c, err := options.clientBuilder(addr) @@ -267,7 +218,7 @@ func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) { cliRes, err := createSessionTokenForDuration(ctx, c, options.SessionExpirationDuration) if err != nil && options.Logger != nil { options.Logger.Warn("failed to create neofs session token for client", - zap.String("address", addr), + zap.String("Address", addr), zap.Error(err)) } else if err == nil { healthy, atLeastOneHealthy = true, true @@ -300,12 +251,85 @@ func newPool(ctx context.Context, options *BuilderOptions) (*Pool, error) { stokenDuration: options.SessionExpirationDuration, stokenThreshold: options.SessionTokenThreshold, } - go startRebalance(ctx, pool, options) + + rebalanceParams := rebalanceParameters{ + nodesParams: nodesParams, + nodeRequestTimeout: options.NodeRequestTimeout, + clientRebalanceInterval: options.ClientRebalanceInterval, + sessionExpirationDuration: options.SessionExpirationDuration, + } + + go startRebalance(ctx, pool, rebalanceParams) return pool, nil } -func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) { - ticker := time.NewTimer(options.ClientRebalanceInterval) +func fillDefaultInitParams(params *InitParameters) { + if params.SessionExpirationDuration == 0 { + params.SessionExpirationDuration = defaultSessionTokenExpirationDuration + } + + if params.SessionTokenThreshold <= 0 { + params.SessionTokenThreshold = defaultSessionTokenThreshold + } + + if params.ClientRebalanceInterval <= 0 { + params.ClientRebalanceInterval = defaultRebalanceInterval + } + + if params.NodeRequestTimeout <= 0 { + params.NodeRequestTimeout = defaultRequestTimeout + } + + if params.clientBuilder == nil { + params.clientBuilder = func(addr string) (Client, error) { + var c client.Client + + var prmInit client.PrmInit + prmInit.ResolveNeoFSFailures() + prmInit.SetDefaultPrivateKey(*params.Key) + + c.Init(prmInit) + + var prmDial client.PrmDial + prmDial.SetServerURI(addr) + prmDial.SetTimeout(params.NodeConnectionTimeout) + + return &c, c.Dial(prmDial) + } + } +} + +func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { + if len(nodeParams) == 0 { + return nil, errors.New("no NeoFS peers configured") + } + + nodesParamsMap := make(map[int]*nodesParam) + for _, param := range nodeParams { + nodes, ok := nodesParamsMap[param.Priority] + if !ok { + nodes = &nodesParam{priority: param.Priority} + } + nodes.addresses = append(nodes.addresses, param.Address) + nodes.weights = append(nodes.weights, param.Weight) + nodesParamsMap[param.Priority] = nodes + } + + nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) + for _, nodes := range nodesParamsMap { + nodes.weights = adjustWeights(nodes.weights) + nodesParams = append(nodesParams, nodes) + } + + sort.Slice(nodesParams, func(i, j int) bool { + return nodesParams[i].priority < nodesParams[j].priority + }) + + return nodesParams, nil +} + +func startRebalance(ctx context.Context, p *Pool, options rebalanceParameters) { + ticker := time.NewTimer(options.clientRebalanceInterval) buffers := make([][]float64, len(options.nodesParams)) for i, params := range options.nodesParams { buffers[i] = make([]float64, len(params.weights)) @@ -318,12 +342,12 @@ func startRebalance(ctx context.Context, p *Pool, options *BuilderOptions) { return case <-ticker.C: updateNodesHealth(ctx, p, options, buffers) - ticker.Reset(options.ClientRebalanceInterval) + ticker.Reset(options.clientRebalanceInterval) } } } -func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, buffers [][]float64) { +func updateNodesHealth(ctx context.Context, p *Pool, options rebalanceParameters, buffers [][]float64) { wg := sync.WaitGroup{} for i, inner := range p.innerPools { wg.Add(1) @@ -337,7 +361,7 @@ func updateNodesHealth(ctx context.Context, p *Pool, options *BuilderOptions, bu wg.Wait() } -func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *BuilderOptions, bufferWeights []float64) { +func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options rebalanceParameters, bufferWeights []float64) { if i > len(pool.innerPools)-1 { return } @@ -353,7 +377,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *Bui go func(j int, cli Client) { defer wg.Done() ok := true - tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) + tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() if _, err := cli.EndpointInfo(tctx, prmEndpoint); err != nil { @@ -367,7 +391,7 @@ func updateInnerNodesHealth(ctx context.Context, pool *Pool, i int, options *Bui if ok { bufferWeights[j] = options.nodesParams[i].weights[j] if !cp.healthy { - cliRes, err := createSessionTokenForDuration(ctx, cli, options.SessionExpirationDuration) + cliRes, err := createSessionTokenForDuration(ctx, cli, options.sessionExpirationDuration) if err != nil { ok = false bufferWeights[j] = 0 diff --git a/pool/pool_test.go b/pool/pool_test.go index 888685f9..a7804749 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -28,15 +28,13 @@ func TestBuildPoolClientFailed(t *testing.T) { return nil, fmt.Errorf("error") } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ + opts := InitParameters{ Key: newPrivateKey(t), + NodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } - _, err := pb.Build(context.TODO(), opts) + _, err := NewPool(context.TODO(), opts) require.Error(t, err) } @@ -54,15 +52,13 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ + opts := InitParameters{ Key: newPrivateKey(t), + NodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } - _, err := pb.Build(context.TODO(), opts) + _, err := NewPool(context.TODO(), opts) require.Error(t, err) } @@ -110,20 +106,20 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { return mockClient2, nil } - pb := new(Builder) - pb.AddNode("peer0", 9, 1) - pb.AddNode("peer1", 1, 1) - log, err := zap.NewProduction() require.NoError(t, err) - opts := &BuilderOptions{ + opts := InitParameters{ Key: newPrivateKey(t), clientBuilder: clientBuilder, ClientRebalanceInterval: 1000 * time.Millisecond, Logger: log, + NodeParams: []NodeParam{ + {9, "peer0", 1}, + {1, "peer1", 1}, + }, } - clientPool, err := pb.Build(context.TODO(), opts) + clientPool, err := NewPool(context.TODO(), opts) require.NoError(t, err) t.Cleanup(clientPool.Close) @@ -136,11 +132,10 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { } func TestBuildPoolZeroNodes(t *testing.T) { - pb := new(Builder) - opts := &BuilderOptions{ + opts := InitParameters{ Key: newPrivateKey(t), } - _, err := pb.Build(context.TODO(), opts) + _, err := NewPool(context.TODO(), opts) require.Error(t, err) } @@ -162,15 +157,13 @@ func TestOneNode(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ + opts := InitParameters{ Key: newPrivateKey(t), + NodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } - pool, err := pb.Build(context.Background(), opts) + pool, err := NewPool(context.Background(), opts) require.NoError(t, err) t.Cleanup(pool.Close) @@ -200,16 +193,16 @@ func TestTwoNodes(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - pb.AddNode("peer1", 1, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + {1, "peer1", 1}, + }, clientBuilder: clientBuilder, } - pool, err := pb.Build(context.Background(), opts) + pool, err := NewPool(context.Background(), opts) require.NoError(t, err) t.Cleanup(pool.Close) @@ -256,17 +249,18 @@ func TestOneOfTwoFailed(t *testing.T) { return mockClient2, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - pb.AddNode("peer1", 9, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + {9, "peer1", 1}, + }, ClientRebalanceInterval: 200 * time.Millisecond, + clientBuilder: clientBuilder, } - pool, err := pb.Build(context.Background(), opts) + pool, err := NewPool(context.Background(), opts) + require.NoError(t, err) t.Cleanup(pool.Close) @@ -292,17 +286,17 @@ func TestTwoFailed(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - pb.AddNode("peer1", 1, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + {1, "peer1", 1}, + }, ClientRebalanceInterval: 200 * time.Millisecond, + clientBuilder: clientBuilder, } - pool, err := pb.Build(context.Background(), opts) + pool, err := NewPool(context.Background(), opts) require.NoError(t, err) t.Cleanup(pool.Close) @@ -336,19 +330,19 @@ func TestSessionCache(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + }, ClientRebalanceInterval: 30 * time.Second, + clientBuilder: clientBuilder, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := pb.Build(ctx, opts) + pool, err := NewPool(ctx, opts) require.NoError(t, err) t.Cleanup(pool.Close) @@ -408,20 +402,20 @@ func TestPriority(t *testing.T) { return mockClient2, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - pb.AddNode("peer1", 2, 100) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + {2, "peer1", 100}, + }, ClientRebalanceInterval: 1500 * time.Millisecond, + clientBuilder: clientBuilder, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := pb.Build(ctx, opts) + pool, err := NewPool(ctx, opts) require.NoError(t, err) t.Cleanup(pool.Close) @@ -463,19 +457,19 @@ func TestSessionCacheWithKey(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), - clientBuilder: clientBuilder, + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + }, ClientRebalanceInterval: 30 * time.Second, + clientBuilder: clientBuilder, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := pb.Build(ctx, opts) + pool, err := NewPool(ctx, opts) require.NoError(t, err) // cache must contain session token @@ -507,18 +501,18 @@ func TestSessionTokenOwner(t *testing.T) { return mockClient, nil } - pb := new(Builder) - pb.AddNode("peer0", 1, 1) - - opts := &BuilderOptions{ - Key: newPrivateKey(t), + opts := InitParameters{ + Key: newPrivateKey(t), + NodeParams: []NodeParam{ + {1, "peer0", 1}, + }, clientBuilder: clientBuilder, } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - p, err := pb.Build(ctx, opts) + p, err := NewPool(ctx, opts) require.NoError(t, err) t.Cleanup(p.Close) diff --git a/pool/sampler_test.go b/pool/sampler_test.go index aa274997..28b80b9a 100644 --- a/pool/sampler_test.go +++ b/pool/sampler_test.go @@ -68,7 +68,7 @@ func TestHealthyReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} + options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) ) @@ -118,7 +118,7 @@ func TestHealthyNoReweight(t *testing.T) { var ( weights = []float64{0.9, 0.1} names = []string{"node0", "node1"} - options = &BuilderOptions{nodesParams: []*NodesParam{{weights: weights}}} + options = rebalanceParameters{nodesParams: []*nodesParam{{weights: weights}}} buffer = make([]float64, len(weights)) )