[#165] pool: change exported fields to setters

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-15 15:49:14 +03:00 committed by Alex Vanin
parent 9be9697856
commit 191d85e607
3 changed files with 193 additions and 130 deletions

View file

@ -9,14 +9,12 @@ Create pool instance with 3 nodes connection.
This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use
192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%.
:
prm := pool.InitParameters{
Key: key,
NodeParams: []NodeParam{
{ Priority: 1, Address: "192.168.130.71", Weight: 1 },
{ Priority: 2, Address: "192.168.130.72", Weight: 9 },
{ Priority: 2, Address: "192.168.130.73", Weight: 1 },
// ...
}
var prm pool.InitParameters
prm.SetKey(key)
prm.AddNode(NewNodeParam(1, "192.168.130.71", 1))
prm.AddNode(NewNodeParam(2, "192.168.130.72", 9))
prm.AddNode(NewNodeParam(2, "192.168.130.73", 1))
// ...
p, err := pool.NewPool(prm)
// ...

View file

@ -53,18 +53,62 @@ type client interface {
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
Key *ecdsa.PrivateKey
Logger *zap.Logger
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
SessionTokenThreshold time.Duration
SessionExpirationDuration uint64
NodeParams []NodeParam
key *ecdsa.PrivateKey
logger *zap.Logger
nodeDialTimeout time.Duration
healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration
sessionTokenThreshold time.Duration
sessionExpirationDuration uint64
nodeParams []NodeParam
clientBuilder func(endpoint string) (client, error)
}
// SetKey specifies default key to be used for the protocol communication by default.
func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) {
x.key = key
}
// SetLogger specifies logger.
func (x *InitParameters) SetLogger(logger *zap.Logger) {
x.logger = logger
}
// SetNodeDialTimeout specifies the timeout for connection to be established.
func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) {
x.nodeDialTimeout = timeout
}
// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive.
//
// See also Pool.Dial.
func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
x.healthcheckTimeout = timeout
}
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
//
// See also Pool.Dial.
func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) {
x.clientRebalanceInterval = interval
}
// SetSessionThreshold specifies the max session token life time for PutObject operation.
func (x *InitParameters) SetSessionThreshold(threshold time.Duration) {
x.sessionTokenThreshold = threshold
}
// SetSessionExpirationDuration specifies the session token lifetime in epochs.
func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) {
x.sessionExpirationDuration = expirationDuration
}
// AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam)
}
type rebalanceParameters struct {
nodesParams []*nodesParam
nodeRequestTimeout time.Duration
@ -80,22 +124,58 @@ type nodesParam struct {
// NodeParam groups parameters of remote node.
type NodeParam struct {
Priority int
Address string
Weight float64
priority int
address string
weight float64
}
// NewNodeParam creates NodeParam using parameters.
func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) {
prm.SetPriority(priority)
prm.SetAddress(address)
prm.SetWeight(weight)
return
}
// SetPriority specifies priority of the node.
// Negative value is allowed. In the result node groups
// with the same priority will be sorted by descent.
func (x *NodeParam) SetPriority(priority int) {
x.priority = priority
}
// SetAddress specifies address of the node.
func (x *NodeParam) SetAddress(address string) {
x.address = address
}
// SetWeight specifies weight of the node.
func (x *NodeParam) SetWeight(weight float64) {
x.weight = weight
}
// ContainerPollingParams contains parameters used in polling is a container created or not.
type ContainerPollingParams struct {
CreationTimeout time.Duration
PollInterval time.Duration
timeout time.Duration
pollInterval time.Duration
}
// SetTimeout specifies the time to wait for the operation to complete.
func (x *ContainerPollingParams) SetTimeout(timeout time.Duration) {
x.timeout = timeout
}
// SetPollInterval specifies the interval, once it will check the completion of the operation.
func (x *ContainerPollingParams) SetPollInterval(tick time.Duration) {
x.pollInterval = tick
}
// DefaultPollingParams creates ContainerPollingParams with default values.
func DefaultPollingParams() *ContainerPollingParams {
return &ContainerPollingParams{
CreationTimeout: 120 * time.Second,
PollInterval: 5 * time.Second,
timeout: 120 * time.Second,
pollInterval: 5 * time.Second,
}
}
@ -243,11 +323,11 @@ func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
type PrmContainerPut struct {
prmCommon
cnr *container.Container
cnr container.Container
}
// SetContainer specifies structured information about new NeoFS container.
func (x *PrmContainerPut) SetContainer(cnr *container.Container) {
func (x *PrmContainerPut) SetContainer(cnr container.Container) {
x.cnr = cnr
}
@ -255,11 +335,11 @@ func (x *PrmContainerPut) SetContainer(cnr *container.Container) {
type PrmContainerGet struct {
prmCommon
cnrID *cid.ID
cnrID cid.ID
}
// SetContainerID specifies identifier of the container to be read.
func (x *PrmContainerGet) SetContainerID(cnrID *cid.ID) {
func (x *PrmContainerGet) SetContainerID(cnrID cid.ID) {
x.cnrID = cnrID
}
@ -267,11 +347,11 @@ func (x *PrmContainerGet) SetContainerID(cnrID *cid.ID) {
type PrmContainerList struct {
prmCommon
ownerID *owner.ID
ownerID owner.ID
}
// SetOwnerID specifies identifier of the NeoFS account to list the containers.
func (x *PrmContainerList) SetOwnerID(ownerID *owner.ID) {
func (x *PrmContainerList) SetOwnerID(ownerID owner.ID) {
x.ownerID = ownerID
}
@ -279,11 +359,11 @@ func (x *PrmContainerList) SetOwnerID(ownerID *owner.ID) {
type PrmContainerDelete struct {
prmCommon
cnrID *cid.ID
cnrID cid.ID
}
// SetContainerID specifies identifier of the NeoFS container to be removed.
func (x *PrmContainerDelete) SetContainerID(cnrID *cid.ID) {
func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) {
x.cnrID = cnrID
}
@ -291,11 +371,11 @@ func (x *PrmContainerDelete) SetContainerID(cnrID *cid.ID) {
type PrmContainerEACL struct {
prmCommon
cnrID *cid.ID
cnrID cid.ID
}
// SetContainerID specifies identifier of the NeoFS container to read the eACL table.
func (x *PrmContainerEACL) SetContainerID(cnrID *cid.ID) {
func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
x.cnrID = cnrID
}
@ -303,11 +383,11 @@ func (x *PrmContainerEACL) SetContainerID(cnrID *cid.ID) {
type PrmContainerSetEACL struct {
prmCommon
table *eacl.Table
table eacl.Table
}
// SetTable specifies eACL table structure to be set for the container.
func (x *PrmContainerSetEACL) SetTable(table *eacl.Table) {
func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
x.table = table
}
@ -315,11 +395,11 @@ func (x *PrmContainerSetEACL) SetTable(table *eacl.Table) {
type PrmBalanceGet struct {
prmCommon
ownerID *owner.ID
ownerID owner.ID
}
// SetOwnerID specifies identifier of the NeoFS account for which the balance is requested.
func (x *PrmBalanceGet) SetOwnerID(ownerID *owner.ID) {
func (x *PrmBalanceGet) SetOwnerID(ownerID owner.ID) {
x.ownerID = ownerID
}
@ -372,13 +452,13 @@ const (
defaultRequestTimeout = 4 * time.Second
)
// NewPool create connection pool using parameters.
// NewPool creates connection pool using parameters.
func NewPool(options InitParameters) (*Pool, error) {
if options.Key == nil {
if options.key == nil {
return nil, fmt.Errorf("missed required parameter 'Key'")
}
nodesParams, err := adjustNodeParams(options.NodeParams)
nodesParams, err := adjustNodeParams(options.nodeParams)
if err != nil {
return nil, err
}
@ -391,17 +471,17 @@ func NewPool(options InitParameters) (*Pool, error) {
}
pool := &Pool{
key: options.Key,
owner: owner.NewIDFromPublicKey(&options.Key.PublicKey),
key: options.key,
owner: owner.NewIDFromPublicKey(&options.key.PublicKey),
cache: cache,
logger: options.Logger,
stokenDuration: options.SessionExpirationDuration,
stokenThreshold: options.SessionTokenThreshold,
logger: options.logger,
stokenDuration: options.sessionExpirationDuration,
stokenThreshold: options.sessionTokenThreshold,
rebalanceParams: rebalanceParameters{
nodesParams: nodesParams,
nodeRequestTimeout: options.NodeRequestTimeout,
clientRebalanceInterval: options.ClientRebalanceInterval,
sessionExpirationDuration: options.SessionExpirationDuration,
nodeRequestTimeout: options.healthcheckTimeout,
clientRebalanceInterval: options.clientRebalanceInterval,
sessionExpirationDuration: options.sessionExpirationDuration,
},
clientBuilder: options.clientBuilder,
}
@ -410,8 +490,13 @@ func NewPool(options InitParameters) (*Pool, error) {
}
// Dial establishes a connection to the servers from the NeoFS network.
// Returns an error describing failure reason. If failed, the Pool
// SHOULD NOT be used.
// It also starts a routine that checks the health of the nodes and
// updates the weights of the nodes for balancing.
// Returns an error describing failure reason.
//
// If failed, the Pool SHOULD NOT be used.
//
// See also InitParameters.SetClientRebalanceInterval.
func (p *Pool) Dial(ctx context.Context) error {
inner := make([]*innerPool, len(p.rebalanceParams.nodesParams))
var atLeastOneHealthy bool
@ -459,20 +544,20 @@ func (p *Pool) Dial(ctx context.Context) error {
}
func fillDefaultInitParams(params *InitParameters) {
if params.SessionExpirationDuration == 0 {
params.SessionExpirationDuration = defaultSessionTokenExpirationDuration
if params.sessionExpirationDuration == 0 {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
}
if params.SessionTokenThreshold <= 0 {
params.SessionTokenThreshold = defaultSessionTokenThreshold
if params.sessionTokenThreshold <= 0 {
params.sessionTokenThreshold = defaultSessionTokenThreshold
}
if params.ClientRebalanceInterval <= 0 {
params.ClientRebalanceInterval = defaultRebalanceInterval
if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval
}
if params.NodeRequestTimeout <= 0 {
params.NodeRequestTimeout = defaultRequestTimeout
if params.healthcheckTimeout <= 0 {
params.healthcheckTimeout = defaultRequestTimeout
}
if params.clientBuilder == nil {
@ -481,13 +566,13 @@ func fillDefaultInitParams(params *InitParameters) {
var prmInit sdkClient.PrmInit
prmInit.ResolveNeoFSFailures()
prmInit.SetDefaultPrivateKey(*params.Key)
prmInit.SetDefaultPrivateKey(*params.key)
c.Init(prmInit)
var prmDial sdkClient.PrmDial
prmDial.SetServerURI(addr)
prmDial.SetTimeout(params.NodeConnectionTimeout)
prmDial.SetTimeout(params.nodeDialTimeout)
return &c, c.Dial(prmDial)
}
@ -501,13 +586,13 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
nodesParamsMap := make(map[int]*nodesParam)
for _, param := range nodeParams {
nodes, ok := nodesParamsMap[param.Priority]
nodes, ok := nodesParamsMap[param.priority]
if !ok {
nodes = &nodesParam{priority: param.Priority}
nodes = &nodesParam{priority: param.priority}
}
nodes.addresses = append(nodes.addresses, param.Address)
nodes.weights = append(nodes.weights, param.Weight)
nodesParamsMap[param.Priority] = nodes
nodes.addresses = append(nodes.addresses, param.address)
nodes.weights = append(nodes.weights, param.weight)
nodesParamsMap[param.priority] = nodes
}
nodesParams := make([]*nodesParam, 0, len(nodesParamsMap))
@ -1320,10 +1405,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID,
}
var cliPrm sdkClient.PrmContainerPut
if prm.cnr != nil {
cliPrm.SetContainer(*prm.cnr)
}
cliPrm.SetContainer(prm.cnr)
res, err := cp.client.ContainerPut(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
@ -1341,10 +1423,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe
}
var cliPrm sdkClient.PrmContainerGet
if prm.cnrID != nil {
cliPrm.SetContainer(*prm.cnrID)
}
cliPrm.SetContainer(prm.cnrID)
res, err := cp.client.ContainerGet(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
@ -1362,10 +1441,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
}
var cliPrm sdkClient.PrmContainerList
if prm.ownerID != nil {
cliPrm.SetAccount(*prm.ownerID)
}
cliPrm.SetAccount(prm.ownerID)
res, err := cp.client.ContainerList(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
@ -1388,10 +1464,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
}
var cliPrm sdkClient.PrmContainerDelete
if prm.cnrID != nil {
cliPrm.SetContainer(*prm.cnrID)
}
cliPrm.SetContainer(prm.cnrID)
if prm.stoken != nil {
cliPrm.SetSessionToken(*prm.stoken)
@ -1412,10 +1485,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table,
}
var cliPrm sdkClient.PrmContainerEACL
if prm.cnrID != nil {
cliPrm.SetContainer(*prm.cnrID)
}
cliPrm.SetContainer(prm.cnrID)
res, err := cp.client.ContainerEACL(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
@ -1438,10 +1508,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
}
var cliPrm sdkClient.PrmContainerSetEACL
if prm.table != nil {
cliPrm.SetTable(*prm.table)
}
cliPrm.SetTable(prm.table)
_, err = cp.client.ContainerSetEACL(ctx, cliPrm)
@ -1458,10 +1525,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
}
var cliPrm sdkClient.PrmBalanceGet
if prm.ownerID != nil {
cliPrm.SetAccount(*prm.ownerID)
}
cliPrm.SetAccount(prm.ownerID)
res, err := cp.client.BalanceGet(ctx, cliPrm)
if err != nil { // here err already carries both status and client errors
@ -1477,9 +1541,9 @@ func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa
if err != nil {
return err
}
wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout)
wctx, cancel := context.WithTimeout(ctx, pollParams.timeout)
defer cancel()
ticker := time.NewTimer(pollParams.PollInterval)
ticker := time.NewTimer(pollParams.pollInterval)
defer ticker.Stop()
wdone := wctx.Done()
done := ctx.Done()
@ -1501,11 +1565,12 @@ func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa
if err == nil {
return nil
}
ticker.Reset(pollParams.PollInterval)
ticker.Reset(pollParams.pollInterval)
}
}
}
// NetworkInfo requests information about the NeoFS network of which the remote server is a part.
func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) {
cp, err := p.connection()
if err != nil {

View file

@ -29,8 +29,8 @@ func TestBuildPoolClientFailed(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder,
}
@ -55,8 +55,8 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder,
}
@ -113,11 +113,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
log, err := zap.NewProduction()
require.NoError(t, err)
opts := InitParameters{
Key: newPrivateKey(t),
key: newPrivateKey(t),
clientBuilder: clientBuilder,
ClientRebalanceInterval: 1000 * time.Millisecond,
Logger: log,
NodeParams: []NodeParam{
clientRebalanceInterval: 1000 * time.Millisecond,
logger: log,
nodeParams: []NodeParam{
{9, "peer0", 1},
{1, "peer1", 1},
},
@ -143,7 +143,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) {
func TestBuildPoolZeroNodes(t *testing.T) {
opts := InitParameters{
Key: newPrivateKey(t),
key: newPrivateKey(t),
}
_, err := NewPool(opts)
require.Error(t, err)
@ -168,8 +168,8 @@ func TestOneNode(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{{1, "peer0", 1}},
key: newPrivateKey(t),
nodeParams: []NodeParam{{1, "peer0", 1}},
clientBuilder: clientBuilder,
}
@ -207,8 +207,8 @@ func TestTwoNodes(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
{1, "peer1", 1},
},
@ -266,12 +266,12 @@ func TestOneOfTwoFailed(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
{9, "peer1", 1},
},
ClientRebalanceInterval: 200 * time.Millisecond,
clientRebalanceInterval: 200 * time.Millisecond,
clientBuilder: clientBuilder,
}
@ -307,12 +307,12 @@ func TestTwoFailed(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
{1, "peer1", 1},
},
ClientRebalanceInterval: 200 * time.Millisecond,
clientRebalanceInterval: 200 * time.Millisecond,
clientBuilder: clientBuilder,
}
@ -354,11 +354,11 @@ func TestSessionCache(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
},
ClientRebalanceInterval: 30 * time.Second,
clientRebalanceInterval: 30 * time.Second,
clientBuilder: clientBuilder,
}
@ -437,12 +437,12 @@ func TestPriority(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
{2, "peer1", 100},
},
ClientRebalanceInterval: 1500 * time.Millisecond,
clientRebalanceInterval: 1500 * time.Millisecond,
clientBuilder: clientBuilder,
}
@ -496,11 +496,11 @@ func TestSessionCacheWithKey(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
},
ClientRebalanceInterval: 30 * time.Second,
clientRebalanceInterval: 30 * time.Second,
clientBuilder: clientBuilder,
}
@ -547,8 +547,8 @@ func TestSessionTokenOwner(t *testing.T) {
}
opts := InitParameters{
Key: newPrivateKey(t),
NodeParams: []NodeParam{
key: newPrivateKey(t),
nodeParams: []NodeParam{
{1, "peer0", 1},
},
clientBuilder: clientBuilder,
@ -617,8 +617,8 @@ func TestWaitPresence(t *testing.T) {
t.Run("context deadline exceeded", func(t *testing.T) {
ctx := context.Background()
err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{
CreationTimeout: 500 * time.Millisecond,
PollInterval: 5 * time.Second,
timeout: 500 * time.Millisecond,
pollInterval: 5 * time.Second,
})
require.Error(t, err)
require.Contains(t, err.Error(), "context deadline exceeded")
@ -627,8 +627,8 @@ func TestWaitPresence(t *testing.T) {
t.Run("ok", func(t *testing.T) {
ctx := context.Background()
err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{
CreationTimeout: 10 * time.Second,
PollInterval: 500 * time.Millisecond,
timeout: 10 * time.Second,
pollInterval: 500 * time.Millisecond,
})
require.NoError(t, err)
})