diff --git a/pool/doc.go b/pool/doc.go index 792c89a..f209377 100644 --- a/pool/doc.go +++ b/pool/doc.go @@ -9,14 +9,12 @@ Create pool instance with 3 nodes connection. This InitParameters will make pool use 192.168.130.71 node while it is healthy. Otherwise, it will make the pool use 192.168.130.72 for 90% of requests and 192.168.130.73 for remaining 10%. : - prm := pool.InitParameters{ - Key: key, - NodeParams: []NodeParam{ - { Priority: 1, Address: "192.168.130.71", Weight: 1 }, - { Priority: 2, Address: "192.168.130.72", Weight: 9 }, - { Priority: 2, Address: "192.168.130.73", Weight: 1 }, - // ... - } + var prm pool.InitParameters + prm.SetKey(key) + prm.AddNode(NewNodeParam(1, "192.168.130.71", 1)) + prm.AddNode(NewNodeParam(2, "192.168.130.72", 9)) + prm.AddNode(NewNodeParam(2, "192.168.130.73", 1)) + // ... p, err := pool.NewPool(prm) // ... diff --git a/pool/pool.go b/pool/pool.go index 0d3f31d..21b9279 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -53,18 +53,62 @@ type client interface { // InitParameters contains values used to initialize connection Pool. type InitParameters struct { - Key *ecdsa.PrivateKey - Logger *zap.Logger - NodeConnectionTimeout time.Duration - NodeRequestTimeout time.Duration - ClientRebalanceInterval time.Duration - SessionTokenThreshold time.Duration - SessionExpirationDuration uint64 - NodeParams []NodeParam + key *ecdsa.PrivateKey + logger *zap.Logger + nodeDialTimeout time.Duration + healthcheckTimeout time.Duration + clientRebalanceInterval time.Duration + sessionTokenThreshold time.Duration + sessionExpirationDuration uint64 + nodeParams []NodeParam clientBuilder func(endpoint string) (client, error) } +// SetKey specifies default key to be used for the protocol communication by default. +func (x *InitParameters) SetKey(key *ecdsa.PrivateKey) { + x.key = key +} + +// SetLogger specifies logger. +func (x *InitParameters) SetLogger(logger *zap.Logger) { + x.logger = logger +} + +// SetNodeDialTimeout specifies the timeout for connection to be established. +func (x *InitParameters) SetNodeDialTimeout(timeout time.Duration) { + x.nodeDialTimeout = timeout +} + +// SetHealthcheckTimeout specifies the timeout for request to node to decide if it is alive. +// +// See also Pool.Dial. +func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) { + x.healthcheckTimeout = timeout +} + +// SetClientRebalanceInterval specifies the interval for updating nodes health status. +// +// See also Pool.Dial. +func (x *InitParameters) SetClientRebalanceInterval(interval time.Duration) { + x.clientRebalanceInterval = interval +} + +// SetSessionThreshold specifies the max session token life time for PutObject operation. +func (x *InitParameters) SetSessionThreshold(threshold time.Duration) { + x.sessionTokenThreshold = threshold +} + +// SetSessionExpirationDuration specifies the session token lifetime in epochs. +func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64) { + x.sessionExpirationDuration = expirationDuration +} + +// AddNode append information about the node to which you want to connect. +func (x *InitParameters) AddNode(nodeParam NodeParam) { + x.nodeParams = append(x.nodeParams, nodeParam) +} + type rebalanceParameters struct { nodesParams []*nodesParam nodeRequestTimeout time.Duration @@ -80,22 +124,58 @@ type nodesParam struct { // NodeParam groups parameters of remote node. type NodeParam struct { - Priority int - Address string - Weight float64 + priority int + address string + weight float64 +} + +// NewNodeParam creates NodeParam using parameters. +func NewNodeParam(priority int, address string, weight float64) (prm NodeParam) { + prm.SetPriority(priority) + prm.SetAddress(address) + prm.SetWeight(weight) + + return +} + +// SetPriority specifies priority of the node. +// Negative value is allowed. In the result node groups +// with the same priority will be sorted by descent. +func (x *NodeParam) SetPriority(priority int) { + x.priority = priority +} + +// SetAddress specifies address of the node. +func (x *NodeParam) SetAddress(address string) { + x.address = address +} + +// SetWeight specifies weight of the node. +func (x *NodeParam) SetWeight(weight float64) { + x.weight = weight } // ContainerPollingParams contains parameters used in polling is a container created or not. type ContainerPollingParams struct { - CreationTimeout time.Duration - PollInterval time.Duration + timeout time.Duration + pollInterval time.Duration +} + +// SetTimeout specifies the time to wait for the operation to complete. +func (x *ContainerPollingParams) SetTimeout(timeout time.Duration) { + x.timeout = timeout +} + +// SetPollInterval specifies the interval, once it will check the completion of the operation. +func (x *ContainerPollingParams) SetPollInterval(tick time.Duration) { + x.pollInterval = tick } // DefaultPollingParams creates ContainerPollingParams with default values. func DefaultPollingParams() *ContainerPollingParams { return &ContainerPollingParams{ - CreationTimeout: 120 * time.Second, - PollInterval: 5 * time.Second, + timeout: 120 * time.Second, + pollInterval: 5 * time.Second, } } @@ -243,11 +323,11 @@ func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) { type PrmContainerPut struct { prmCommon - cnr *container.Container + cnr container.Container } // SetContainer specifies structured information about new NeoFS container. -func (x *PrmContainerPut) SetContainer(cnr *container.Container) { +func (x *PrmContainerPut) SetContainer(cnr container.Container) { x.cnr = cnr } @@ -255,11 +335,11 @@ func (x *PrmContainerPut) SetContainer(cnr *container.Container) { type PrmContainerGet struct { prmCommon - cnrID *cid.ID + cnrID cid.ID } // SetContainerID specifies identifier of the container to be read. -func (x *PrmContainerGet) SetContainerID(cnrID *cid.ID) { +func (x *PrmContainerGet) SetContainerID(cnrID cid.ID) { x.cnrID = cnrID } @@ -267,11 +347,11 @@ func (x *PrmContainerGet) SetContainerID(cnrID *cid.ID) { type PrmContainerList struct { prmCommon - ownerID *owner.ID + ownerID owner.ID } // SetOwnerID specifies identifier of the NeoFS account to list the containers. -func (x *PrmContainerList) SetOwnerID(ownerID *owner.ID) { +func (x *PrmContainerList) SetOwnerID(ownerID owner.ID) { x.ownerID = ownerID } @@ -279,11 +359,11 @@ func (x *PrmContainerList) SetOwnerID(ownerID *owner.ID) { type PrmContainerDelete struct { prmCommon - cnrID *cid.ID + cnrID cid.ID } // SetContainerID specifies identifier of the NeoFS container to be removed. -func (x *PrmContainerDelete) SetContainerID(cnrID *cid.ID) { +func (x *PrmContainerDelete) SetContainerID(cnrID cid.ID) { x.cnrID = cnrID } @@ -291,11 +371,11 @@ func (x *PrmContainerDelete) SetContainerID(cnrID *cid.ID) { type PrmContainerEACL struct { prmCommon - cnrID *cid.ID + cnrID cid.ID } // SetContainerID specifies identifier of the NeoFS container to read the eACL table. -func (x *PrmContainerEACL) SetContainerID(cnrID *cid.ID) { +func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) { x.cnrID = cnrID } @@ -303,11 +383,11 @@ func (x *PrmContainerEACL) SetContainerID(cnrID *cid.ID) { type PrmContainerSetEACL struct { prmCommon - table *eacl.Table + table eacl.Table } // SetTable specifies eACL table structure to be set for the container. -func (x *PrmContainerSetEACL) SetTable(table *eacl.Table) { +func (x *PrmContainerSetEACL) SetTable(table eacl.Table) { x.table = table } @@ -315,11 +395,11 @@ func (x *PrmContainerSetEACL) SetTable(table *eacl.Table) { type PrmBalanceGet struct { prmCommon - ownerID *owner.ID + ownerID owner.ID } // SetOwnerID specifies identifier of the NeoFS account for which the balance is requested. -func (x *PrmBalanceGet) SetOwnerID(ownerID *owner.ID) { +func (x *PrmBalanceGet) SetOwnerID(ownerID owner.ID) { x.ownerID = ownerID } @@ -372,13 +452,13 @@ const ( defaultRequestTimeout = 4 * time.Second ) -// NewPool create connection pool using parameters. +// NewPool creates connection pool using parameters. func NewPool(options InitParameters) (*Pool, error) { - if options.Key == nil { + if options.key == nil { return nil, fmt.Errorf("missed required parameter 'Key'") } - nodesParams, err := adjustNodeParams(options.NodeParams) + nodesParams, err := adjustNodeParams(options.nodeParams) if err != nil { return nil, err } @@ -391,17 +471,17 @@ func NewPool(options InitParameters) (*Pool, error) { } pool := &Pool{ - key: options.Key, - owner: owner.NewIDFromPublicKey(&options.Key.PublicKey), + key: options.key, + owner: owner.NewIDFromPublicKey(&options.key.PublicKey), cache: cache, - logger: options.Logger, - stokenDuration: options.SessionExpirationDuration, - stokenThreshold: options.SessionTokenThreshold, + logger: options.logger, + stokenDuration: options.sessionExpirationDuration, + stokenThreshold: options.sessionTokenThreshold, rebalanceParams: rebalanceParameters{ nodesParams: nodesParams, - nodeRequestTimeout: options.NodeRequestTimeout, - clientRebalanceInterval: options.ClientRebalanceInterval, - sessionExpirationDuration: options.SessionExpirationDuration, + nodeRequestTimeout: options.healthcheckTimeout, + clientRebalanceInterval: options.clientRebalanceInterval, + sessionExpirationDuration: options.sessionExpirationDuration, }, clientBuilder: options.clientBuilder, } @@ -410,8 +490,13 @@ func NewPool(options InitParameters) (*Pool, error) { } // Dial establishes a connection to the servers from the NeoFS network. -// Returns an error describing failure reason. If failed, the Pool -// SHOULD NOT be used. +// It also starts a routine that checks the health of the nodes and +// updates the weights of the nodes for balancing. +// Returns an error describing failure reason. +// +// If failed, the Pool SHOULD NOT be used. +// +// See also InitParameters.SetClientRebalanceInterval. func (p *Pool) Dial(ctx context.Context) error { inner := make([]*innerPool, len(p.rebalanceParams.nodesParams)) var atLeastOneHealthy bool @@ -459,20 +544,20 @@ func (p *Pool) Dial(ctx context.Context) error { } func fillDefaultInitParams(params *InitParameters) { - if params.SessionExpirationDuration == 0 { - params.SessionExpirationDuration = defaultSessionTokenExpirationDuration + if params.sessionExpirationDuration == 0 { + params.sessionExpirationDuration = defaultSessionTokenExpirationDuration } - if params.SessionTokenThreshold <= 0 { - params.SessionTokenThreshold = defaultSessionTokenThreshold + if params.sessionTokenThreshold <= 0 { + params.sessionTokenThreshold = defaultSessionTokenThreshold } - if params.ClientRebalanceInterval <= 0 { - params.ClientRebalanceInterval = defaultRebalanceInterval + if params.clientRebalanceInterval <= 0 { + params.clientRebalanceInterval = defaultRebalanceInterval } - if params.NodeRequestTimeout <= 0 { - params.NodeRequestTimeout = defaultRequestTimeout + if params.healthcheckTimeout <= 0 { + params.healthcheckTimeout = defaultRequestTimeout } if params.clientBuilder == nil { @@ -481,13 +566,13 @@ func fillDefaultInitParams(params *InitParameters) { var prmInit sdkClient.PrmInit prmInit.ResolveNeoFSFailures() - prmInit.SetDefaultPrivateKey(*params.Key) + prmInit.SetDefaultPrivateKey(*params.key) c.Init(prmInit) var prmDial sdkClient.PrmDial prmDial.SetServerURI(addr) - prmDial.SetTimeout(params.NodeConnectionTimeout) + prmDial.SetTimeout(params.nodeDialTimeout) return &c, c.Dial(prmDial) } @@ -501,13 +586,13 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) { nodesParamsMap := make(map[int]*nodesParam) for _, param := range nodeParams { - nodes, ok := nodesParamsMap[param.Priority] + nodes, ok := nodesParamsMap[param.priority] if !ok { - nodes = &nodesParam{priority: param.Priority} + nodes = &nodesParam{priority: param.priority} } - nodes.addresses = append(nodes.addresses, param.Address) - nodes.weights = append(nodes.weights, param.Weight) - nodesParamsMap[param.Priority] = nodes + nodes.addresses = append(nodes.addresses, param.address) + nodes.weights = append(nodes.weights, param.weight) + nodesParamsMap[param.priority] = nodes } nodesParams := make([]*nodesParam, 0, len(nodesParamsMap)) @@ -1320,10 +1405,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, } var cliPrm sdkClient.PrmContainerPut - - if prm.cnr != nil { - cliPrm.SetContainer(*prm.cnr) - } + cliPrm.SetContainer(prm.cnr) res, err := cp.client.ContainerPut(ctx, cliPrm) if err != nil { // here err already carries both status and client errors @@ -1341,10 +1423,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe } var cliPrm sdkClient.PrmContainerGet - - if prm.cnrID != nil { - cliPrm.SetContainer(*prm.cnrID) - } + cliPrm.SetContainer(prm.cnrID) res, err := cp.client.ContainerGet(ctx, cliPrm) if err != nil { // here err already carries both status and client errors @@ -1362,10 +1441,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. } var cliPrm sdkClient.PrmContainerList - - if prm.ownerID != nil { - cliPrm.SetAccount(*prm.ownerID) - } + cliPrm.SetAccount(prm.ownerID) res, err := cp.client.ContainerList(ctx, cliPrm) if err != nil { // here err already carries both status and client errors @@ -1388,10 +1464,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro } var cliPrm sdkClient.PrmContainerDelete - - if prm.cnrID != nil { - cliPrm.SetContainer(*prm.cnrID) - } + cliPrm.SetContainer(prm.cnrID) if prm.stoken != nil { cliPrm.SetSessionToken(*prm.stoken) @@ -1412,10 +1485,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, } var cliPrm sdkClient.PrmContainerEACL - - if prm.cnrID != nil { - cliPrm.SetContainer(*prm.cnrID) - } + cliPrm.SetContainer(prm.cnrID) res, err := cp.client.ContainerEACL(ctx, cliPrm) if err != nil { // here err already carries both status and client errors @@ -1438,10 +1508,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { } var cliPrm sdkClient.PrmContainerSetEACL - - if prm.table != nil { - cliPrm.SetTable(*prm.table) - } + cliPrm.SetTable(prm.table) _, err = cp.client.ContainerSetEACL(ctx, cliPrm) @@ -1458,10 +1525,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci } var cliPrm sdkClient.PrmBalanceGet - - if prm.ownerID != nil { - cliPrm.SetAccount(*prm.ownerID) - } + cliPrm.SetAccount(prm.ownerID) res, err := cp.client.BalanceGet(ctx, cliPrm) if err != nil { // here err already carries both status and client errors @@ -1477,9 +1541,9 @@ func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa if err != nil { return err } - wctx, cancel := context.WithTimeout(ctx, pollParams.CreationTimeout) + wctx, cancel := context.WithTimeout(ctx, pollParams.timeout) defer cancel() - ticker := time.NewTimer(pollParams.PollInterval) + ticker := time.NewTimer(pollParams.pollInterval) defer ticker.Stop() wdone := wctx.Done() done := ctx.Done() @@ -1501,11 +1565,12 @@ func (p *Pool) WaitForContainerPresence(ctx context.Context, cid *cid.ID, pollPa if err == nil { return nil } - ticker.Reset(pollParams.PollInterval) + ticker.Reset(pollParams.pollInterval) } } } +// NetworkInfo requests information about the NeoFS network of which the remote server is a part. func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) { cp, err := p.connection() if err != nil { diff --git a/pool/pool_test.go b/pool/pool_test.go index 8e90ec1..9326a27 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -29,8 +29,8 @@ func TestBuildPoolClientFailed(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{{1, "peer0", 1}}, + key: newPrivateKey(t), + nodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } @@ -55,8 +55,8 @@ func TestBuildPoolCreateSessionFailed(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{{1, "peer0", 1}}, + key: newPrivateKey(t), + nodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } @@ -113,11 +113,11 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { log, err := zap.NewProduction() require.NoError(t, err) opts := InitParameters{ - Key: newPrivateKey(t), + key: newPrivateKey(t), clientBuilder: clientBuilder, - ClientRebalanceInterval: 1000 * time.Millisecond, - Logger: log, - NodeParams: []NodeParam{ + clientRebalanceInterval: 1000 * time.Millisecond, + logger: log, + nodeParams: []NodeParam{ {9, "peer0", 1}, {1, "peer1", 1}, }, @@ -143,7 +143,7 @@ func TestBuildPoolOneNodeFailed(t *testing.T) { func TestBuildPoolZeroNodes(t *testing.T) { opts := InitParameters{ - Key: newPrivateKey(t), + key: newPrivateKey(t), } _, err := NewPool(opts) require.Error(t, err) @@ -168,8 +168,8 @@ func TestOneNode(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{{1, "peer0", 1}}, + key: newPrivateKey(t), + nodeParams: []NodeParam{{1, "peer0", 1}}, clientBuilder: clientBuilder, } @@ -207,8 +207,8 @@ func TestTwoNodes(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, {1, "peer1", 1}, }, @@ -266,12 +266,12 @@ func TestOneOfTwoFailed(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, {9, "peer1", 1}, }, - ClientRebalanceInterval: 200 * time.Millisecond, + clientRebalanceInterval: 200 * time.Millisecond, clientBuilder: clientBuilder, } @@ -307,12 +307,12 @@ func TestTwoFailed(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, {1, "peer1", 1}, }, - ClientRebalanceInterval: 200 * time.Millisecond, + clientRebalanceInterval: 200 * time.Millisecond, clientBuilder: clientBuilder, } @@ -354,11 +354,11 @@ func TestSessionCache(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, }, - ClientRebalanceInterval: 30 * time.Second, + clientRebalanceInterval: 30 * time.Second, clientBuilder: clientBuilder, } @@ -437,12 +437,12 @@ func TestPriority(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, {2, "peer1", 100}, }, - ClientRebalanceInterval: 1500 * time.Millisecond, + clientRebalanceInterval: 1500 * time.Millisecond, clientBuilder: clientBuilder, } @@ -496,11 +496,11 @@ func TestSessionCacheWithKey(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, }, - ClientRebalanceInterval: 30 * time.Second, + clientRebalanceInterval: 30 * time.Second, clientBuilder: clientBuilder, } @@ -547,8 +547,8 @@ func TestSessionTokenOwner(t *testing.T) { } opts := InitParameters{ - Key: newPrivateKey(t), - NodeParams: []NodeParam{ + key: newPrivateKey(t), + nodeParams: []NodeParam{ {1, "peer0", 1}, }, clientBuilder: clientBuilder, @@ -617,8 +617,8 @@ func TestWaitPresence(t *testing.T) { t.Run("context deadline exceeded", func(t *testing.T) { ctx := context.Background() err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{ - CreationTimeout: 500 * time.Millisecond, - PollInterval: 5 * time.Second, + timeout: 500 * time.Millisecond, + pollInterval: 5 * time.Second, }) require.Error(t, err) require.Contains(t, err.Error(), "context deadline exceeded") @@ -627,8 +627,8 @@ func TestWaitPresence(t *testing.T) { t.Run("ok", func(t *testing.T) { ctx := context.Background() err := p.WaitForContainerPresence(ctx, nil, &ContainerPollingParams{ - CreationTimeout: 10 * time.Second, - PollInterval: 500 * time.Millisecond, + timeout: 10 * time.Second, + pollInterval: 500 * time.Millisecond, }) require.NoError(t, err) })