[#147] Add context to waiter
Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
parent
201db45bd7
commit
88232efe67
4 changed files with 56 additions and 26 deletions
|
@ -13,6 +13,16 @@ import (
|
||||||
|
|
||||||
const alreadyExistsError = "already exists"
|
const alreadyExistsError = "already exists"
|
||||||
|
|
||||||
|
// ContextWaiter is an interface for wrapper around [waiter.Waiter] that respects
|
||||||
|
// a context while waiting for a transaction.
|
||||||
|
type ContextWaiter interface {
|
||||||
|
// Deprecated: use WaitCtx instead to prevent waiter hang up when discrete time stops ticking.
|
||||||
|
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
|
||||||
|
|
||||||
|
WaitCtx(ctx context.Context, h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
|
||||||
|
WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error)
|
||||||
|
}
|
||||||
|
|
||||||
type WaiterOptions struct {
|
type WaiterOptions struct {
|
||||||
// IgnoreAlreadyExistsError controls behavior for "already exists" error:
|
// IgnoreAlreadyExistsError controls behavior for "already exists" error:
|
||||||
// - If set to true, it indicates that "already exists" error is not a problem, we should
|
// - If set to true, it indicates that "already exists" error is not a problem, we should
|
||||||
|
@ -44,6 +54,7 @@ func NewWaiter(waiter waiter.Waiter, options WaiterOptions) *Waiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait allows to wait until transaction will be accepted to the chain.
|
// Wait allows to wait until transaction will be accepted to the chain.
|
||||||
|
// Deprecated: use WaitCtx instead to prevent waiter hang up when discrete time stops ticking.
|
||||||
func (w *Waiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (w *Waiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
if !w.options.IgnoreAlreadyExistsError && errIsAlreadyExists(err) {
|
if !w.options.IgnoreAlreadyExistsError && errIsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -52,6 +63,19 @@ func (w *Waiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResu
|
||||||
return w.examineExecResult(result, err)
|
return w.examineExecResult(result, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitCtx allows to wait until transaction is accepted to the chain.
|
||||||
|
// Waiting is limited by the specified context.
|
||||||
|
func (w *Waiter) WaitCtx(ctx context.Context, h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
|
if err != nil {
|
||||||
|
shouldIgnore := errIsAlreadyExists(err) && w.options.IgnoreAlreadyExistsError
|
||||||
|
if !shouldIgnore {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result, err := w.waiter.WaitAny(ctx, vub, h)
|
||||||
|
return w.examineExecResult(result, err)
|
||||||
|
}
|
||||||
|
|
||||||
// WaitAny waits until at least one of the specified transactions will be accepted
|
// WaitAny waits until at least one of the specified transactions will be accepted
|
||||||
// to the chain.
|
// to the chain.
|
||||||
func (w *Waiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
func (w *Waiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
||||||
|
|
|
@ -46,12 +46,8 @@ func (w *mockWaiter) failedResult(txHash util.Uint256, exception string) *state.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (m *mockWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
args := m.Called(h, vub, err)
|
// Our Waiter struct must use WaitAny, so we don't implement this method
|
||||||
result := args.Get(0)
|
panic("not implemented")
|
||||||
if result == nil {
|
|
||||||
return nil, args.Error(1)
|
|
||||||
}
|
|
||||||
return result.(*state.AppExecResult), args.Error(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
func (m *mockWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
||||||
|
@ -65,15 +61,17 @@ func (m *mockWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
|
||||||
|
|
||||||
func TestWaiter(t *testing.T) {
|
func TestWaiter(t *testing.T) {
|
||||||
txHash := util.Uint256{}
|
txHash := util.Uint256{}
|
||||||
|
txHashes := []util.Uint256{txHash}
|
||||||
|
|
||||||
vub := uint32(100)
|
vub := uint32(100)
|
||||||
|
|
||||||
t.Run("ignore already exists error", func(t *testing.T) {
|
t.Run("ignore already exists error", func(t *testing.T) {
|
||||||
sendErr := fmt.Errorf("transaction already exists")
|
sendErr := fmt.Errorf("transaction already exists")
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, mock.Anything).Return(mw.successfulResult(txHash), nil)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(mw.successfulResult(txHash), nil)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{IgnoreAlreadyExistsError: true})
|
waiter := NewWaiter(mw, WaiterOptions{IgnoreAlreadyExistsError: true})
|
||||||
_, err := waiter.Wait(txHash, vub, sendErr)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, sendErr)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
@ -81,10 +79,10 @@ func TestWaiter(t *testing.T) {
|
||||||
t.Run("report already exists error", func(t *testing.T) {
|
t.Run("report already exists error", func(t *testing.T) {
|
||||||
sendErr := fmt.Errorf("transaction already exists")
|
sendErr := fmt.Errorf("transaction already exists")
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, mock.Anything).Return(mw.successfulResult(txHash), nil)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(mw.successfulResult(txHash), nil)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{IgnoreAlreadyExistsError: false})
|
waiter := NewWaiter(mw, WaiterOptions{IgnoreAlreadyExistsError: false})
|
||||||
_, err := waiter.Wait(txHash, vub, sendErr)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, sendErr)
|
||||||
|
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
|
@ -92,10 +90,10 @@ func TestWaiter(t *testing.T) {
|
||||||
t.Run("report wait error when transaction error is ignored", func(t *testing.T) {
|
t.Run("report wait error when transaction error is ignored", func(t *testing.T) {
|
||||||
waitErr := fmt.Errorf("mock error")
|
waitErr := fmt.Errorf("mock error")
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, nil).Return(nil, waitErr)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(nil, waitErr)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: false})
|
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: false})
|
||||||
_, err := waiter.Wait(txHash, vub, nil)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, nil)
|
||||||
|
|
||||||
require.ErrorIs(t, err, waitErr)
|
require.ErrorIs(t, err, waitErr)
|
||||||
})
|
})
|
||||||
|
@ -103,10 +101,10 @@ func TestWaiter(t *testing.T) {
|
||||||
t.Run("report wait error when transaction error is verified", func(t *testing.T) {
|
t.Run("report wait error when transaction error is verified", func(t *testing.T) {
|
||||||
waitErr := fmt.Errorf("mock error")
|
waitErr := fmt.Errorf("mock error")
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, nil).Return(nil, waitErr)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(nil, waitErr)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: true})
|
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: true})
|
||||||
_, err := waiter.Wait(txHash, vub, nil)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, nil)
|
||||||
|
|
||||||
require.ErrorIs(t, err, waitErr)
|
require.ErrorIs(t, err, waitErr)
|
||||||
})
|
})
|
||||||
|
@ -114,10 +112,10 @@ func TestWaiter(t *testing.T) {
|
||||||
t.Run("ignore error from transaction", func(t *testing.T) {
|
t.Run("ignore error from transaction", func(t *testing.T) {
|
||||||
txError := "mock error"
|
txError := "mock error"
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, nil).Return(mw.failedResult(txHash, txError), nil)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(mw.failedResult(txHash, txError), nil)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: false})
|
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: false})
|
||||||
_, err := waiter.Wait(txHash, vub, nil)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, nil)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
@ -125,10 +123,10 @@ func TestWaiter(t *testing.T) {
|
||||||
t.Run("examine error from transaction", func(t *testing.T) {
|
t.Run("examine error from transaction", func(t *testing.T) {
|
||||||
txError := "mock error"
|
txError := "mock error"
|
||||||
mw := &mockWaiter{}
|
mw := &mockWaiter{}
|
||||||
mw.On("Wait", txHash, vub, nil).Return(mw.failedResult(txHash, txError), nil)
|
mw.On("WaitAny", mock.Anything, vub, txHashes).Return(mw.failedResult(txHash, txError), nil)
|
||||||
|
|
||||||
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: true})
|
waiter := NewWaiter(mw, WaiterOptions{VerifyExecResults: true})
|
||||||
_, err := waiter.Wait(txHash, vub, nil)
|
_, err := waiter.WaitCtx(context.Background(), txHash, vub, nil)
|
||||||
|
|
||||||
require.ErrorContains(t, err, txError)
|
require.ErrorContains(t, err, txError)
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
|
"git.frostfs.info/TrueCloudLab/frostfs-contract/commonclient"
|
||||||
|
@ -11,7 +12,6 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/actor"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/notary"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/notary"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
"github.com/nspcc-dev/neo-go/pkg/rpcclient/unwrap"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
|
||||||
|
@ -23,7 +23,7 @@ import (
|
||||||
type (
|
type (
|
||||||
Client struct {
|
Client struct {
|
||||||
act *actor.Actor
|
act *actor.Actor
|
||||||
waiter waiter.Waiter
|
waiter commonclient.ContextWaiter
|
||||||
contract util.Uint160
|
contract util.Uint160
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,12 +614,13 @@ func (c Client) ListNonEmptyNamespaces() ([]string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait waits until the specified transaction is accepted to the chain.
|
// Wait waits until the specified transaction is accepted to the chain.
|
||||||
|
// Deprecated: use [Waiter] method instead.
|
||||||
func (c Client) Wait(tx util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (c Client) Wait(tx util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
return c.Waiter().Wait(tx, vub, err)
|
return c.Waiter().WaitCtx(context.TODO(), tx, vub, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waiter returns underlying waiter.Waiter.
|
// Waiter returns underlying waiter.
|
||||||
func (c Client) Waiter() waiter.Waiter {
|
func (c Client) Waiter() commonclient.ContextWaiter {
|
||||||
return c.waiter
|
return c.waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -191,6 +191,7 @@ func TestFrostFSID_Client_NamespaceManagement(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFrostFSID_Client_DefaultNamespace(t *testing.T) {
|
func TestFrostFSID_Client_DefaultNamespace(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
ffsid, cancel := initFrostfsIFClientTest(t)
|
ffsid, cancel := initFrostfsIFClientTest(t)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -198,7 +199,9 @@ func TestFrostFSID_Client_DefaultNamespace(t *testing.T) {
|
||||||
subjKey, subjAddr := newKey(t)
|
subjKey, subjAddr := newKey(t)
|
||||||
|
|
||||||
ffsid.a.await(ffsid.cli.CreateSubject(defaultNamespace, subjKey.PublicKey()))
|
ffsid.a.await(ffsid.cli.CreateSubject(defaultNamespace, subjKey.PublicKey()))
|
||||||
groupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Wait(ffsid.cli.CreateGroup(defaultNamespace, group)))
|
|
||||||
|
txHash, vub, err := ffsid.cli.CreateGroup(defaultNamespace, group)
|
||||||
|
groupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Waiter().WaitCtx(ctx, txHash, vub, err))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
ffsid.a.await(ffsid.cli.AddSubjectToGroup(subjAddr, groupID))
|
ffsid.a.await(ffsid.cli.AddSubjectToGroup(subjAddr, groupID))
|
||||||
|
|
||||||
|
@ -226,6 +229,7 @@ func TestFrostFSID_Client_DefaultNamespace(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFrostFSID_Client_GroupManagement(t *testing.T) {
|
func TestFrostFSID_Client_GroupManagement(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
ffsid, cancel := initFrostfsIFClientTest(t)
|
ffsid, cancel := initFrostfsIFClientTest(t)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -237,7 +241,8 @@ func TestFrostFSID_Client_GroupManagement(t *testing.T) {
|
||||||
|
|
||||||
groupName := "group"
|
groupName := "group"
|
||||||
groupID := int64(1)
|
groupID := int64(1)
|
||||||
actGroupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Wait(ffsid.cli.CreateGroup(namespace, groupName)))
|
txHash, vub, err := ffsid.cli.CreateGroup(namespace, groupName)
|
||||||
|
actGroupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Waiter().WaitCtx(ctx, txHash, vub, err))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, groupID, actGroupID)
|
require.Equal(t, groupID, actGroupID)
|
||||||
|
|
||||||
|
@ -538,11 +543,13 @@ func TestFrostFSID_Client_GetSubjectByName(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFrostFSID_Client_GetGroupByName(t *testing.T) {
|
func TestFrostFSID_Client_GetGroupByName(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
ffsid, cancel := initFrostfsIFClientTest(t)
|
ffsid, cancel := initFrostfsIFClientTest(t)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
groupName := "group"
|
groupName := "group"
|
||||||
actGroupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Wait(ffsid.cli.CreateGroup(defaultNamespace, groupName)))
|
txHash, vub, err := ffsid.cli.CreateGroup(defaultNamespace, groupName)
|
||||||
|
actGroupID, err := ffsid.cli.ParseGroupID(ffsid.cli.Waiter().WaitCtx(ctx, txHash, vub, err))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
group, err := ffsid.cli.GetGroupByName(defaultNamespace, groupName)
|
group, err := ffsid.cli.GetGroupByName(defaultNamespace, groupName)
|
||||||
|
|
Loading…
Add table
Reference in a new issue