[#180] pool: make some operations synchronous
Operations: PutContainer, DeleteContainer, SetEACL Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
0dbea5452a
commit
a709cf5444
2 changed files with 92 additions and 26 deletions
105
pool/pool.go
105
pool/pool.go
|
@ -171,11 +171,15 @@ func (x *WaitParams) SetPollInterval(tick time.Duration) {
|
||||||
x.pollInterval = tick
|
x.pollInterval = tick
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultWaitParams creates WaitParams with default values.
|
func (x *WaitParams) setDefaults() {
|
||||||
func DefaultWaitParams() *WaitParams {
|
x.timeout = 120 * time.Second
|
||||||
return &WaitParams{
|
x.pollInterval = 5 * time.Second
|
||||||
timeout: 120 * time.Second,
|
}
|
||||||
pollInterval: 5 * time.Second,
|
|
||||||
|
// checkForPositive panics if any of the wait params isn't positive.
|
||||||
|
func (x *WaitParams) checkForPositive() {
|
||||||
|
if x.timeout <= 0 || x.pollInterval <= 0 {
|
||||||
|
panic("all wait params must be positive")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,6 +326,9 @@ func (x *PrmObjectSearch) SetFilters(filters object.SearchFilters) {
|
||||||
// PrmContainerPut groups parameters of PutContainer operation.
|
// PrmContainerPut groups parameters of PutContainer operation.
|
||||||
type PrmContainerPut struct {
|
type PrmContainerPut struct {
|
||||||
cnr container.Container
|
cnr container.Container
|
||||||
|
|
||||||
|
waitParams WaitParams
|
||||||
|
waitParamsSet bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetContainer specifies structured information about new NeoFS container.
|
// SetContainer specifies structured information about new NeoFS container.
|
||||||
|
@ -329,6 +336,15 @@ func (x *PrmContainerPut) SetContainer(cnr container.Container) {
|
||||||
x.cnr = cnr
|
x.cnr = cnr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetWaitParams specifies timeout params to complete operation.
|
||||||
|
// If not provided the default one will be used.
|
||||||
|
// Panics if any of the wait params isn't positive.
|
||||||
|
func (x *PrmContainerPut) SetWaitParams(waitParams WaitParams) {
|
||||||
|
waitParams.checkForPositive()
|
||||||
|
x.waitParams = waitParams
|
||||||
|
x.waitParamsSet = true
|
||||||
|
}
|
||||||
|
|
||||||
// PrmContainerGet groups parameters of GetContainer operation.
|
// PrmContainerGet groups parameters of GetContainer operation.
|
||||||
type PrmContainerGet struct {
|
type PrmContainerGet struct {
|
||||||
cnrID cid.ID
|
cnrID cid.ID
|
||||||
|
@ -353,6 +369,9 @@ func (x *PrmContainerList) SetOwnerID(ownerID owner.ID) {
|
||||||
type PrmContainerDelete struct {
|
type PrmContainerDelete struct {
|
||||||
stoken session.Token
|
stoken session.Token
|
||||||
cnrID cid.ID
|
cnrID cid.ID
|
||||||
|
|
||||||
|
waitParams WaitParams
|
||||||
|
waitParamsSet bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetContainerID specifies identifier of the NeoFS container to be removed.
|
// SetContainerID specifies identifier of the NeoFS container to be removed.
|
||||||
|
@ -365,6 +384,15 @@ func (x *PrmContainerDelete) SetSessionToken(token session.Token) {
|
||||||
x.stoken = token
|
x.stoken = token
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetWaitParams specifies timeout params to complete operation.
|
||||||
|
// If not provided the default one will be used.
|
||||||
|
// Panics if any of the wait params isn't positive.
|
||||||
|
func (x *PrmContainerDelete) SetWaitParams(waitParams WaitParams) {
|
||||||
|
waitParams.checkForPositive()
|
||||||
|
x.waitParams = waitParams
|
||||||
|
x.waitParamsSet = true
|
||||||
|
}
|
||||||
|
|
||||||
// PrmContainerEACL groups parameters of GetEACL operation.
|
// PrmContainerEACL groups parameters of GetEACL operation.
|
||||||
type PrmContainerEACL struct {
|
type PrmContainerEACL struct {
|
||||||
cnrID cid.ID
|
cnrID cid.ID
|
||||||
|
@ -378,6 +406,9 @@ func (x *PrmContainerEACL) SetContainerID(cnrID cid.ID) {
|
||||||
// PrmContainerSetEACL groups parameters of SetEACL operation.
|
// PrmContainerSetEACL groups parameters of SetEACL operation.
|
||||||
type PrmContainerSetEACL struct {
|
type PrmContainerSetEACL struct {
|
||||||
table eacl.Table
|
table eacl.Table
|
||||||
|
|
||||||
|
waitParams WaitParams
|
||||||
|
waitParamsSet bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetTable specifies eACL table structure to be set for the container.
|
// SetTable specifies eACL table structure to be set for the container.
|
||||||
|
@ -385,6 +416,15 @@ func (x *PrmContainerSetEACL) SetTable(table eacl.Table) {
|
||||||
x.table = table
|
x.table = table
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetWaitParams specifies timeout params to complete operation.
|
||||||
|
// If not provided the default one will be used.
|
||||||
|
// Panics if any of the wait params isn't positive.
|
||||||
|
func (x *PrmContainerSetEACL) SetWaitParams(waitParams WaitParams) {
|
||||||
|
waitParams.checkForPositive()
|
||||||
|
x.waitParams = waitParams
|
||||||
|
x.waitParamsSet = true
|
||||||
|
}
|
||||||
|
|
||||||
// PrmBalanceGet groups parameters of Balance operation.
|
// PrmBalanceGet groups parameters of Balance operation.
|
||||||
type PrmBalanceGet struct {
|
type PrmBalanceGet struct {
|
||||||
ownerID owner.ID
|
ownerID owner.ID
|
||||||
|
@ -1371,10 +1411,11 @@ func (p *Pool) SearchObjects(ctx context.Context, prm PrmObjectSearch) (*ResObje
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutContainer sends request to save container in NeoFS.
|
// PutContainer sends request to save container in NeoFS and waits for the operation to complete.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
// The required time is also not predictable.
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
//
|
//
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
|
func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
|
||||||
|
@ -1391,7 +1432,11 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return res.ID(), nil
|
if !prm.waitParamsSet {
|
||||||
|
prm.waitParams.setDefaults()
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.ID(), waitForContainerPresence(ctx, p, res.ID(), &prm.waitParams)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetContainer reads NeoFS container by ID.
|
// GetContainer reads NeoFS container by ID.
|
||||||
|
@ -1430,10 +1475,11 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteContainer sends request to remove the NeoFS container.
|
// DeleteContainer sends request to remove the NeoFS container and waits for the operation to complete.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
// The required time is also not predictable.
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
//
|
//
|
||||||
// Success can be verified by reading by identifier (see GetContainer).
|
// Success can be verified by reading by identifier (see GetContainer).
|
||||||
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) error {
|
||||||
|
@ -1450,9 +1496,17 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
|
||||||
|
|
||||||
// here err already carries both status and client errors
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !prm.waitParamsSet {
|
||||||
|
prm.waitParams.setDefaults()
|
||||||
|
}
|
||||||
|
|
||||||
|
return waitForContainerRemoved(ctx, p, &prm.cnrID, &prm.waitParams)
|
||||||
|
}
|
||||||
|
|
||||||
// GetEACL reads eACL table of the NeoFS container.
|
// GetEACL reads eACL table of the NeoFS container.
|
||||||
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) {
|
func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
|
@ -1471,10 +1525,11 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table,
|
||||||
return res.Table(), nil
|
return res.Table(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetEACL sends request to update eACL table of the NeoFS container.
|
// SetEACL sends request to update eACL table of the NeoFS container and waits for the operation to complete.
|
||||||
//
|
//
|
||||||
// Operation is asynchronous and no guaranteed even in the absence of errors.
|
// Waiting parameters can be specified using SetWaitParams. If not called, defaults are used:
|
||||||
// The required time is also not predictable.
|
// polling interval: 5s
|
||||||
|
// waiting timeout: 120s
|
||||||
//
|
//
|
||||||
// Success can be verified by reading by identifier (see GetEACL).
|
// Success can be verified by reading by identifier (see GetEACL).
|
||||||
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
|
@ -1490,9 +1545,17 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
|
||||||
|
|
||||||
// here err already carries both status and client errors
|
// here err already carries both status and client errors
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !prm.waitParamsSet {
|
||||||
|
prm.waitParams.setDefaults()
|
||||||
|
}
|
||||||
|
|
||||||
|
return waitForEACLPresence(ctx, p, prm.table.CID(), &prm.table, &prm.waitParams)
|
||||||
|
}
|
||||||
|
|
||||||
// Balance requests current balance of the NeoFS account.
|
// Balance requests current balance of the NeoFS account.
|
||||||
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) {
|
func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Decimal, error) {
|
||||||
cp, err := p.connection()
|
cp, err := p.connection()
|
||||||
|
@ -1511,8 +1574,8 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
|
||||||
return res.Amount(), nil
|
return res.Amount(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForContainerPresence waits until the container is found on the NeoFS network.
|
// waitForContainerPresence waits until the container is found on the NeoFS network.
|
||||||
func WaitForContainerPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error {
|
func waitForContainerPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error {
|
||||||
var prm PrmContainerGet
|
var prm PrmContainerGet
|
||||||
if cnrID != nil {
|
if cnrID != nil {
|
||||||
prm.SetContainerID(*cnrID)
|
prm.SetContainerID(*cnrID)
|
||||||
|
@ -1524,8 +1587,8 @@ func WaitForContainerPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, wa
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForEACLPresence waits until the container eacl is applied on the NeoFS network.
|
// waitForEACLPresence waits until the container eacl is applied on the NeoFS network.
|
||||||
func WaitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error {
|
func waitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error {
|
||||||
var prm PrmContainerEACL
|
var prm PrmContainerEACL
|
||||||
if cnrID != nil {
|
if cnrID != nil {
|
||||||
prm.SetContainerID(*cnrID)
|
prm.SetContainerID(*cnrID)
|
||||||
|
@ -1540,8 +1603,8 @@ func WaitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table *
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForContainerRemoved waits until the container is removed from the NeoFS network.
|
// waitForContainerRemoved waits until the container is removed from the NeoFS network.
|
||||||
func WaitForContainerRemoved(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error {
|
func waitForContainerRemoved(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error {
|
||||||
var prm PrmContainerGet
|
var prm PrmContainerGet
|
||||||
if cnrID != nil {
|
if cnrID != nil {
|
||||||
prm.SetContainerID(*cnrID)
|
prm.SetContainerID(*cnrID)
|
||||||
|
|
|
@ -616,14 +616,17 @@ func TestWaitPresence(t *testing.T) {
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err := WaitForContainerPresence(ctx, p, nil, DefaultWaitParams())
|
err = waitForContainerPresence(ctx, p, nil, &WaitParams{
|
||||||
|
timeout: 120 * time.Second,
|
||||||
|
pollInterval: 5 * time.Second,
|
||||||
|
})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Contains(t, err.Error(), "context canceled")
|
require.Contains(t, err.Error(), "context canceled")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("context deadline exceeded", func(t *testing.T) {
|
t.Run("context deadline exceeded", func(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err := WaitForContainerPresence(ctx, p, nil, &WaitParams{
|
err := waitForContainerPresence(ctx, p, nil, &WaitParams{
|
||||||
timeout: 500 * time.Millisecond,
|
timeout: 500 * time.Millisecond,
|
||||||
pollInterval: 5 * time.Second,
|
pollInterval: 5 * time.Second,
|
||||||
})
|
})
|
||||||
|
@ -633,7 +636,7 @@ func TestWaitPresence(t *testing.T) {
|
||||||
|
|
||||||
t.Run("ok", func(t *testing.T) {
|
t.Run("ok", func(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err := WaitForContainerPresence(ctx, p, nil, &WaitParams{
|
err := waitForContainerPresence(ctx, p, nil, &WaitParams{
|
||||||
timeout: 10 * time.Second,
|
timeout: 10 * time.Second,
|
||||||
pollInterval: 500 * time.Millisecond,
|
pollInterval: 500 * time.Millisecond,
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue