Merge pull request #3556 from nspcc-dev/adjust-waiter

Make PollingBased waiter more flexible
This commit is contained in:
Roman Khimov 2024-08-20 12:32:51 +03:00 committed by GitHub
commit dfd4566a04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 81 additions and 20 deletions

View file

@ -106,6 +106,10 @@ type Options struct {
// before it's signed (other methods that perform test invocations // before it's signed (other methods that perform test invocations
// use CheckerModifier). MakeUnsigned* methods do not run it. // use CheckerModifier). MakeUnsigned* methods do not run it.
Modifier TransactionModifier Modifier TransactionModifier
// WaiterConfig is used by [waiter.Waiter] constructor to customize
// awaiting behaviour. This option may be kept empty for default
// awaiting behaviour.
WaiterConfig waiter.Config
} }
// New creates an Actor instance using the specified RPC interface and the set of // New creates an Actor instance using the specified RPC interface and the set of
@ -183,6 +187,7 @@ func NewTuned(ra RPCActor, signers []SignerAccount, opts Options) (*Actor, error
if opts.Modifier != nil { if opts.Modifier != nil {
a.opts.Modifier = opts.Modifier a.opts.Modifier = opts.Modifier
} }
a.Waiter = waiter.NewCustom(ra, a.version, opts.WaiterConfig)
return a, err return a, err
} }

View file

@ -15,11 +15,11 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
) )
// PollingBasedRetryCount is a threshold for a number of subsequent failed // DefaultPollRetryCount is a threshold for a number of subsequent failed
// attempts to get block count from the RPC server for PollingBased. If it fails // attempts to get block count from the RPC server for PollingBased. If it fails
// to retrieve block count PollingBasedRetryCount times in a raw then transaction // to retrieve block count DefaultPollRetryCount times in a raw then transaction
// awaiting attempt considered to be failed and an error is returned. // awaiting attempt considered to be failed and an error is returned.
const PollingBasedRetryCount = 3 const DefaultPollRetryCount = 3
var ( var (
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain // ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
@ -87,6 +87,23 @@ type Null struct{}
type PollingBased struct { type PollingBased struct {
polling RPCPollingBased polling RPCPollingBased
version *result.Version version *result.Version
config PollConfig
}
// Config is a unified configuration for [Waiter] implementations that allows to
// customize awaiting behaviour.
type Config struct {
PollConfig
}
// PollConfig is a configuration for PollingBased waiter.
type PollConfig struct {
// PollInterval is a time interval between subsequent polls. If not set, then
// default value is a half of configured block time (in milliseconds).
PollInterval time.Duration
// RetryCount is the number of retry attempts while fetching a subsequent block
// count before an error is returned from Wait or WaitAny.
RetryCount int
} }
// EventBased is a websocket-based Waiter. // EventBased is a websocket-based Waiter.
@ -107,20 +124,26 @@ func errIsAlreadyExists(err error) bool {
// or not an implementation of these two interfaces. It returns websocket-based // or not an implementation of these two interfaces. It returns websocket-based
// waiter, polling-based waiter or a stub correspondingly. // waiter, polling-based waiter or a stub correspondingly.
func New(base any, v *result.Version) Waiter { func New(base any, v *result.Version) Waiter {
return NewCustom(base, v, Config{})
}
// NewCustom creates Waiter instance. It can be either websocket-based or
// polling-base, otherwise Waiter stub is returned. As a first argument
// it accepts RPCEventBased implementation, RPCPollingBased implementation
// or not an implementation of these two interfaces. It returns websocket-based
// waiter, polling-based waiter or a stub correspondingly. As the second
// argument it accepts the RPC node version necessary for awaiting behaviour
// customisation. As a third argument it accepts the configuration of
// [Waiter].
func NewCustom(base any, v *result.Version, config Config) Waiter {
if eventW, ok := base.(RPCEventBased); ok { if eventW, ok := base.(RPCEventBased); ok {
return &EventBased{ return &EventBased{
ws: eventW, ws: eventW,
polling: &PollingBased{ polling: newCustomPollingBased(eventW, v, config.PollConfig),
polling: eventW,
version: v,
},
} }
} }
if pollW, ok := base.(RPCPollingBased); ok { if pollW, ok := base.(RPCPollingBased); ok {
return &PollingBased{ return newCustomPollingBased(pollW, v, config.PollConfig)
polling: pollW,
version: v,
}
} }
return NewNull() return NewNull()
} }
@ -142,14 +165,33 @@ func (Null) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*s
// NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting. // NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingBased(waiter RPCPollingBased) (*PollingBased, error) { func NewPollingBased(waiter RPCPollingBased) (*PollingBased, error) {
return NewCustomPollingBased(waiter, PollConfig{})
}
// NewCustomPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
// Poll options may be specified via config parameter.
func NewCustomPollingBased(waiter RPCPollingBased, config PollConfig) (*PollingBased, error) {
v, err := waiter.GetVersion() v, err := waiter.GetVersion()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newCustomPollingBased(waiter, v, config), nil
}
// newCustomPollingBased is an internal constructor of PollingBased waiter that sets
// default configuration values if needed.
func newCustomPollingBased(waiter RPCPollingBased, v *result.Version, config PollConfig) *PollingBased {
if config.PollInterval <= 0 {
config.PollInterval = time.Millisecond * time.Duration(v.Protocol.MillisecondsPerBlock) / 2
}
if config.RetryCount <= 0 {
config.RetryCount = DefaultPollRetryCount
}
return &PollingBased{ return &PollingBased{
polling: waiter, polling: waiter,
version: v, version: v,
}, nil config: config,
}
} }
// Wait implements Waiter interface. // Wait implements Waiter interface.
@ -165,12 +207,8 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
var ( var (
currentHeight uint32 currentHeight uint32
failedAttempt int failedAttempt int
pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2
) )
if pollTime == 0 { timer := time.NewTicker(w.config.PollInterval)
pollTime = time.Second
}
timer := time.NewTicker(pollTime)
defer timer.Stop() defer timer.Stop()
for { for {
select { select {
@ -178,7 +216,7 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
blockCount, err := w.polling.GetBlockCount() blockCount, err := w.polling.GetBlockCount()
if err != nil { if err != nil {
failedAttempt++ failedAttempt++
if failedAttempt > PollingBasedRetryCount { if failedAttempt > w.config.RetryCount {
return nil, fmt.Errorf("failed to retrieve block count: %w", err) return nil, fmt.Errorf("failed to retrieve block count: %w", err)
} }
continue continue
@ -212,7 +250,15 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based // EventBased contains PollingBased under the hood and falls back to polling when subscription-based
// awaiting fails. // awaiting fails.
func NewEventBased(waiter RPCEventBased) (*EventBased, error) { func NewEventBased(waiter RPCEventBased) (*EventBased, error) {
polling, err := NewPollingBased(waiter) return NewCustomEventBased(waiter, Config{})
}
// NewCustomEventBased creates an instance of Waiter supporting websocket event-based transaction awaiting.
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based
// awaiting fails. Waiter configuration options may be specified via config parameter
// (defaults are used if not specified).
func NewCustomEventBased(waiter RPCEventBased, config Config) (*EventBased, error) {
polling, err := NewCustomPollingBased(waiter, config.PollConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -51,6 +51,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/rpcclient/oracle" "github.com/nspcc-dev/neo-go/pkg/rpcclient/oracle"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/policy" "github.com/nspcc-dev/neo-go/pkg/rpcclient/policy"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt" "github.com/nspcc-dev/neo-go/pkg/rpcclient/rolemgmt"
"github.com/nspcc-dev/neo-go/pkg/rpcclient/waiter"
"github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag" "github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
@ -1809,6 +1810,15 @@ func TestClient_Wait(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.True(t, len(b.Transactions) > 0) require.True(t, len(b.Transactions) > 0)
// Ensure Waiter constructor works properly.
if ws {
_, ok := act.Waiter.(*waiter.EventBased)
require.True(t, ok)
} else {
_, ok := act.Waiter.(*waiter.PollingBased)
require.True(t, ok)
}
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
rcvr := make(chan struct{}) rcvr := make(chan struct{})
go func() { go func() {