rpcclient: allow to tune PollingBased waiter

Some clients need more flexible awaiting options (e.g. for short-blocks
networks). The default behaviour is not changed, all exported APIs are
compatible. Ref. https://github.com/nspcc-dev/neofs-node/issues/2864.

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
This commit is contained in:
Anna Shaleva 2024-08-13 18:10:15 +03:00
parent 3e3991cef8
commit 027d726b65

View file

@ -15,11 +15,11 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util"
)
// PollingBasedRetryCount is a threshold for a number of subsequent failed
// DefaultPollRetryCount is a threshold for a number of subsequent failed
// attempts to get block count from the RPC server for PollingBased. If it fails
// to retrieve block count PollingBasedRetryCount times in a raw then transaction
// to retrieve block count DefaultPollRetryCount times in a raw then transaction
// awaiting attempt considered to be failed and an error is returned.
const PollingBasedRetryCount = 3
const DefaultPollRetryCount = 3
var (
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
@ -87,6 +87,17 @@ type Null struct{}
type PollingBased struct {
polling RPCPollingBased
version *result.Version
config PollConfig
}
// PollConfig is a configuration for PollingBased waiter.
type PollConfig struct {
// PollInterval is a time interval between subsequent polls. If not set, then
// default value is a half of configured block time (in milliseconds).
PollInterval time.Duration
// RetryCount is the number of retry attempts while fetching a subsequent block
// count before an error is returned from Wait or WaitAny.
RetryCount int
}
// EventBased is a websocket-based Waiter.
@ -109,18 +120,12 @@ func errIsAlreadyExists(err error) bool {
func New(base any, v *result.Version) Waiter {
if eventW, ok := base.(RPCEventBased); ok {
return &EventBased{
ws: eventW,
polling: &PollingBased{
polling: eventW,
version: v,
},
ws: eventW,
polling: newCustomPollingBased(eventW, v, PollConfig{}),
}
}
if pollW, ok := base.(RPCPollingBased); ok {
return &PollingBased{
polling: pollW,
version: v,
}
return newCustomPollingBased(pollW, v, PollConfig{})
}
return NewNull()
}
@ -142,14 +147,33 @@ func (Null) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*s
// NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingBased(waiter RPCPollingBased) (*PollingBased, error) {
return NewCustomPollingBased(waiter, PollConfig{})
}
// NewCustomPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
// Poll options may be specified via config parameter.
func NewCustomPollingBased(waiter RPCPollingBased, config PollConfig) (*PollingBased, error) {
v, err := waiter.GetVersion()
if err != nil {
return nil, err
}
return newCustomPollingBased(waiter, v, config), nil
}
// newCustomPollingBased is an internal constructor of PollingBased waiter that sets
// default configuration values if needed.
func newCustomPollingBased(waiter RPCPollingBased, v *result.Version, config PollConfig) *PollingBased {
if config.PollInterval <= 0 {
config.PollInterval = time.Millisecond * time.Duration(v.Protocol.MillisecondsPerBlock) / 2
}
if config.RetryCount <= 0 {
config.RetryCount = DefaultPollRetryCount
}
return &PollingBased{
polling: waiter,
version: v,
}, nil
config: config,
}
}
// Wait implements Waiter interface.
@ -165,9 +189,8 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
var (
currentHeight uint32
failedAttempt int
pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2
)
timer := time.NewTicker(pollTime)
timer := time.NewTicker(w.config.PollInterval)
defer timer.Stop()
for {
select {
@ -175,7 +198,7 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
blockCount, err := w.polling.GetBlockCount()
if err != nil {
failedAttempt++
if failedAttempt > PollingBasedRetryCount {
if failedAttempt > w.config.RetryCount {
return nil, fmt.Errorf("failed to retrieve block count: %w", err)
}
continue
@ -209,7 +232,15 @@ func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.U
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based
// awaiting fails.
func NewEventBased(waiter RPCEventBased) (*EventBased, error) {
polling, err := NewPollingBased(waiter)
return NewCustomEventBased(waiter, PollConfig{})
}
// NewCustomEventBased creates an instance of Waiter supporting websocket event-based transaction awaiting.
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based
// awaiting fails. PollingBased configuration options may be specified via pollConfig parameter
// (defaults are used if not specified).
func NewCustomEventBased(waiter RPCEventBased, pollConfig PollConfig) (*EventBased, error) {
polling, err := NewCustomPollingBased(waiter, pollConfig)
if err != nil {
return nil, err
}