[#180] pool: add WaitForEACLPresence

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-23 10:50:34 +03:00 committed by Alex Vanin
parent d568458fab
commit 47345a33da
2 changed files with 44 additions and 22 deletions

View file

@ -155,25 +155,25 @@ func (x *NodeParam) SetWeight(weight float64) {
x.weight = weight x.weight = weight
} }
// ContainerPollingParams contains parameters used in polling is a container created or not. // WaitParams contains parameters used in polling is a something applied on NeoFS network.
type ContainerPollingParams struct { type WaitParams struct {
timeout time.Duration timeout time.Duration
pollInterval time.Duration pollInterval time.Duration
} }
// SetTimeout specifies the time to wait for the operation to complete. // SetTimeout specifies the time to wait for the operation to complete.
func (x *ContainerPollingParams) SetTimeout(timeout time.Duration) { func (x *WaitParams) SetTimeout(timeout time.Duration) {
x.timeout = timeout x.timeout = timeout
} }
// SetPollInterval specifies the interval, once it will check the completion of the operation. // SetPollInterval specifies the interval, once it will check the completion of the operation.
func (x *ContainerPollingParams) SetPollInterval(tick time.Duration) { func (x *WaitParams) SetPollInterval(tick time.Duration) {
x.pollInterval = tick x.pollInterval = tick
} }
// DefaultPollingParams creates ContainerPollingParams with default values. // DefaultWaitParams creates WaitParams with default values.
func DefaultPollingParams() *ContainerPollingParams { func DefaultWaitParams() *WaitParams {
return &ContainerPollingParams{ return &WaitParams{
timeout: 120 * time.Second, timeout: 120 * time.Second,
pollInterval: 5 * time.Second, pollInterval: 5 * time.Second,
} }
@ -1512,19 +1512,42 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
} }
// WaitForContainerPresence waits until the container is found on the NeoFS network. // WaitForContainerPresence waits until the container is found on the NeoFS network.
func WaitForContainerPresence(ctx context.Context, pool *Pool, cid *cid.ID, pollParams *ContainerPollingParams) error { func WaitForContainerPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, waitParams *WaitParams) error {
wctx, cancel := context.WithTimeout(ctx, pollParams.timeout) var prm PrmContainerGet
if cnrID != nil {
prm.SetContainerID(*cnrID)
}
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
_, err := pool.GetContainer(ctx, prm)
return err == nil
})
}
// WaitForEACLPresence waits until the container eacl is applied on the NeoFS network.
func WaitForEACLPresence(ctx context.Context, pool *Pool, cnrID *cid.ID, table *eacl.Table, waitParams *WaitParams) error {
var prm PrmContainerEACL
if cnrID != nil {
prm.SetContainerID(*cnrID)
}
return waitFor(ctx, waitParams, func(ctx context.Context) bool {
eaclTable, err := pool.GetEACL(ctx, prm)
if err == nil {
return table.EqualTo(eaclTable)
}
return false
})
}
// waitFor await that given condition will be met in waitParams time.
func waitFor(ctx context.Context, params *WaitParams, condition func(context.Context) bool) error {
wctx, cancel := context.WithTimeout(ctx, params.timeout)
defer cancel() defer cancel()
ticker := time.NewTimer(pollParams.pollInterval) ticker := time.NewTimer(params.pollInterval)
defer ticker.Stop() defer ticker.Stop()
wdone := wctx.Done() wdone := wctx.Done()
done := ctx.Done() done := ctx.Done()
var prm PrmContainerGet
if cid != nil {
prm.SetContainerID(*cid)
}
for { for {
select { select {
case <-done: case <-done:
@ -1532,11 +1555,10 @@ func WaitForContainerPresence(ctx context.Context, pool *Pool, cid *cid.ID, poll
case <-wdone: case <-wdone:
return wctx.Err() return wctx.Err()
case <-ticker.C: case <-ticker.C:
_, err := pool.GetContainer(ctx, prm) if condition(ctx) {
if err == nil {
return nil return nil
} }
ticker.Reset(pollParams.pollInterval) ticker.Reset(params.pollInterval)
} }
} }
} }

View file

@ -616,14 +616,14 @@ func TestWaitPresence(t *testing.T) {
cancel() cancel()
}() }()
err := WaitForContainerPresence(ctx, p, nil, DefaultPollingParams()) err := WaitForContainerPresence(ctx, p, nil, DefaultWaitParams())
require.Error(t, err) require.Error(t, err)
require.Contains(t, err.Error(), "context canceled") require.Contains(t, err.Error(), "context canceled")
}) })
t.Run("context deadline exceeded", func(t *testing.T) { t.Run("context deadline exceeded", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
err := WaitForContainerPresence(ctx, p, nil, &ContainerPollingParams{ err := WaitForContainerPresence(ctx, p, nil, &WaitParams{
timeout: 500 * time.Millisecond, timeout: 500 * time.Millisecond,
pollInterval: 5 * time.Second, pollInterval: 5 * time.Second,
}) })
@ -633,7 +633,7 @@ func TestWaitPresence(t *testing.T) {
t.Run("ok", func(t *testing.T) { t.Run("ok", func(t *testing.T) {
ctx := context.Background() ctx := context.Background()
err := WaitForContainerPresence(ctx, p, nil, &ContainerPollingParams{ err := WaitForContainerPresence(ctx, p, nil, &WaitParams{
timeout: 10 * time.Second, timeout: 10 * time.Second,
pollInterval: 500 * time.Millisecond, pollInterval: 500 * time.Millisecond,
}) })