mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2024-11-25 03:47:18 +00:00
Merge pull request #3283 from nspcc-dev/waiter-refactor
waiter: refactor naming and implement header-based WS transactions awaiting
This commit is contained in:
commit
d8320372b8
6 changed files with 197 additions and 113 deletions
11
ROADMAP.md
11
ROADMAP.md
|
@ -58,3 +58,14 @@ NeoGo retains certain deprecated error codes: `neorpc.ErrCompatGeneric`,
|
||||||
neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions).
|
neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions).
|
||||||
|
|
||||||
Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard.
|
Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard.
|
||||||
|
|
||||||
|
## Block based web-socket waiter transaction awaiting
|
||||||
|
|
||||||
|
Web-socket RPC based `waiter.EventWaiter` uses `header_of_added_block` notifications
|
||||||
|
subscription to manage transaction awaiting. To support old NeoGo RPC servers
|
||||||
|
(older than 0.105.0) that do not have block headers subscription ability,
|
||||||
|
event-based waiter fallbacks to the old way of block monitoring with
|
||||||
|
`block_added` notifications subscription.
|
||||||
|
|
||||||
|
Removal of stale RPC server compatibility code from `waiter.EventWaiter` is
|
||||||
|
scheduled for May-June 2024 (~0.107.0 release).
|
|
@ -59,18 +59,18 @@ type SignerAccount struct {
|
||||||
// transactions in various ways, while "Send" prefix is used by methods that
|
// transactions in various ways, while "Send" prefix is used by methods that
|
||||||
// directly transmit created transactions to the RPC server.
|
// directly transmit created transactions to the RPC server.
|
||||||
//
|
//
|
||||||
// Actor also provides a Waiter interface to wait until transaction will be
|
// Actor also provides a [waiter.Waiter] interface to wait until transaction will be
|
||||||
// accepted to the chain. Depending on the underlying RPCActor functionality,
|
// accepted to the chain. Depending on the underlying RPCActor functionality,
|
||||||
// transaction awaiting can be performed via web-socket using RPC notifications
|
// transaction awaiting can be performed via web-socket using RPC notifications
|
||||||
// subsystem with EventWaiter, via regular RPC requests using a poll-based
|
// subsystem with [waiter.EventBased], via regular RPC requests using a poll-based
|
||||||
// algorithm with PollingWaiter or can not be performed if RPCActor doesn't
|
// algorithm with [waiter.PollingBased] or can not be performed if RPCActor doesn't
|
||||||
// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with
|
// implement none of [waiter.RPCEventBased] and [waiter.RPCPollingBased] interfaces with
|
||||||
// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the
|
// [waiter.Null]. [waiter.ErrAwaitingNotSupported] will be returned on attempt to await the
|
||||||
// transaction in the latter case. Waiter uses context of the underlying RPCActor
|
// transaction in the latter case. [waiter.Waiter] uses context of the underlying RPCActor
|
||||||
// and interrupts transaction awaiting process if the context is done.
|
// and interrupts transaction awaiting process if the context is done.
|
||||||
// ErrContextDone wrapped with the context's error will be returned in this case.
|
// [waiter.ErrContextDone] wrapped with the context's error will be returned in this case.
|
||||||
// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance
|
// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance
|
||||||
// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment.
|
// and [waiter.ErrTxNotAccepted] is returned if transaction wasn't accepted by this moment.
|
||||||
type Actor struct {
|
type Actor struct {
|
||||||
invoker.Invoker
|
invoker.Invoker
|
||||||
waiter.Waiter
|
waiter.Waiter
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r
|
||||||
return r.applog, nil
|
return r.applog, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = waiter.RPCPollingWaiter(&RPCClient{})
|
var _ = waiter.RPCPollingBased(&RPCClient{})
|
||||||
|
|
||||||
func TestNewActor(t *testing.T) {
|
func TestNewActor(t *testing.T) {
|
||||||
rc := &RPCClient{
|
rc := &RPCClient{
|
||||||
|
|
|
@ -15,11 +15,11 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PollingWaiterRetryCount is a threshold for a number of subsequent failed
|
// PollingBasedRetryCount is a threshold for a number of subsequent failed
|
||||||
// attempts to get block count from the RPC server for PollingWaiter. If it fails
|
// attempts to get block count from the RPC server for PollingBased. If it fails
|
||||||
// to retrieve block count PollingWaiterRetryCount times in a raw then transaction
|
// to retrieve block count PollingBasedRetryCount times in a raw then transaction
|
||||||
// awaiting attempt considered to be failed and an error is returned.
|
// awaiting attempt considered to be failed and an error is returned.
|
||||||
const PollingWaiterRetryCount = 3
|
const PollingBasedRetryCount = 3
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
|
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
|
||||||
|
@ -31,13 +31,13 @@ var (
|
||||||
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
|
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
|
||||||
// doesn't support transaction awaiting.
|
// doesn't support transaction awaiting.
|
||||||
ErrAwaitingNotSupported = errors.New("awaiting not supported")
|
ErrAwaitingNotSupported = errors.New("awaiting not supported")
|
||||||
// ErrMissedEvent is returned when RPCEventWaiter closes receiver channel
|
// ErrMissedEvent is returned when RPCEventBased closes receiver channel
|
||||||
// which happens if missed event was received from the RPC server.
|
// which happens if missed event was received from the RPC server.
|
||||||
ErrMissedEvent = errors.New("some event was missed")
|
ErrMissedEvent = errors.New("some event was missed")
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
// Waiter is an interface providing transaction awaiting functionality to Actor.
|
// Waiter is an interface providing transaction awaiting functionality.
|
||||||
Waiter interface {
|
Waiter interface {
|
||||||
// Wait allows to wait until transaction will be accepted to the chain. It can be
|
// Wait allows to wait until transaction will be accepted to the chain. It can be
|
||||||
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
|
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
|
||||||
|
@ -51,14 +51,14 @@ type (
|
||||||
// WaitAny waits until at least one of the specified transactions will be accepted
|
// WaitAny waits until at least one of the specified transactions will be accepted
|
||||||
// to the chain until vub (including). It returns execution result of this
|
// to the chain until vub (including). It returns execution result of this
|
||||||
// transaction or an error if none of the transactions was accepted to the chain.
|
// transaction or an error if none of the transactions was accepted to the chain.
|
||||||
// It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt
|
// It uses underlying RPCPollingBased or RPCEventBased context to interrupt
|
||||||
// awaiting process, but additional ctx can be passed as an argument for the same
|
// awaiting process, but additional ctx can be passed as an argument for the same
|
||||||
// purpose.
|
// purpose.
|
||||||
WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error)
|
WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error)
|
||||||
}
|
}
|
||||||
// RPCPollingWaiter is an interface that enables transaction awaiting functionality
|
// RPCPollingBased is an interface that enables transaction awaiting functionality
|
||||||
// for Actor instance based on periodical BlockCount and ApplicationLog polls.
|
// based on periodical BlockCount and ApplicationLog polls.
|
||||||
RPCPollingWaiter interface {
|
RPCPollingBased interface {
|
||||||
// Context should return the RPC client context to be able to gracefully
|
// Context should return the RPC client context to be able to gracefully
|
||||||
// shut down all running processes (if so).
|
// shut down all running processes (if so).
|
||||||
Context() context.Context
|
Context() context.Context
|
||||||
|
@ -66,31 +66,32 @@ type (
|
||||||
GetBlockCount() (uint32, error)
|
GetBlockCount() (uint32, error)
|
||||||
GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error)
|
GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error)
|
||||||
}
|
}
|
||||||
// RPCEventWaiter is an interface that enables improved transaction awaiting functionality
|
// RPCEventBased is an interface that enables improved transaction awaiting functionality
|
||||||
// for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter
|
// based on web-socket Block and ApplicationLog notifications. RPCEventBased
|
||||||
// contains RPCPollingWaiter under the hood and falls back to polling when subscription-based
|
// contains RPCPollingBased under the hood and falls back to polling when subscription-based
|
||||||
// awaiting fails.
|
// awaiting fails.
|
||||||
RPCEventWaiter interface {
|
RPCEventBased interface {
|
||||||
RPCPollingWaiter
|
RPCPollingBased
|
||||||
|
|
||||||
|
ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error)
|
||||||
ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error)
|
ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error)
|
||||||
ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error)
|
ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error)
|
||||||
Unsubscribe(id string) error
|
Unsubscribe(id string) error
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality.
|
// Null is a Waiter stub that doesn't support transaction awaiting functionality.
|
||||||
type NullWaiter struct{}
|
type Null struct{}
|
||||||
|
|
||||||
// PollingWaiter is a polling-based Waiter.
|
// PollingBased is a polling-based Waiter.
|
||||||
type PollingWaiter struct {
|
type PollingBased struct {
|
||||||
polling RPCPollingWaiter
|
polling RPCPollingBased
|
||||||
version *result.Version
|
version *result.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
// EventWaiter is a websocket-based Waiter.
|
// EventBased is a websocket-based Waiter.
|
||||||
type EventWaiter struct {
|
type EventBased struct {
|
||||||
ws RPCEventWaiter
|
ws RPCEventBased
|
||||||
polling Waiter
|
polling Waiter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,57 +103,57 @@ func errIsAlreadyExists(err error) bool {
|
||||||
|
|
||||||
// New creates Waiter instance. It can be either websocket-based or
|
// New creates Waiter instance. It can be either websocket-based or
|
||||||
// polling-base, otherwise Waiter stub is returned. As a first argument
|
// polling-base, otherwise Waiter stub is returned. As a first argument
|
||||||
// it accepts RPCEventWaiter implementation, RPCPollingWaiter implementation
|
// it accepts RPCEventBased implementation, RPCPollingBased implementation
|
||||||
// or not an implementation of these two interfaces. It returns websocket-based
|
// or not an implementation of these two interfaces. It returns websocket-based
|
||||||
// waiter, polling-based waiter or a stub correspondingly.
|
// waiter, polling-based waiter or a stub correspondingly.
|
||||||
func New(base any, v *result.Version) Waiter {
|
func New(base any, v *result.Version) Waiter {
|
||||||
if eventW, ok := base.(RPCEventWaiter); ok {
|
if eventW, ok := base.(RPCEventBased); ok {
|
||||||
return &EventWaiter{
|
return &EventBased{
|
||||||
ws: eventW,
|
ws: eventW,
|
||||||
polling: &PollingWaiter{
|
polling: &PollingBased{
|
||||||
polling: eventW,
|
polling: eventW,
|
||||||
version: v,
|
version: v,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if pollW, ok := base.(RPCPollingWaiter); ok {
|
if pollW, ok := base.(RPCPollingBased); ok {
|
||||||
return &PollingWaiter{
|
return &PollingBased{
|
||||||
polling: pollW,
|
polling: pollW,
|
||||||
version: v,
|
version: v,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NewNullWaiter()
|
return NewNull()
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNullWaiter creates an instance of Waiter stub.
|
// NewNull creates an instance of Waiter stub.
|
||||||
func NewNullWaiter() NullWaiter {
|
func NewNull() Null {
|
||||||
return NullWaiter{}
|
return Null{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait implements Waiter interface.
|
// Wait implements Waiter interface.
|
||||||
func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (Null) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
return nil, ErrAwaitingNotSupported
|
return nil, ErrAwaitingNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitAny implements Waiter interface.
|
// WaitAny implements Waiter interface.
|
||||||
func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
func (Null) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
||||||
return nil, ErrAwaitingNotSupported
|
return nil, ErrAwaitingNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting.
|
// NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
|
||||||
func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) {
|
func NewPollingBased(waiter RPCPollingBased) (*PollingBased, error) {
|
||||||
v, err := waiter.GetVersion()
|
v, err := waiter.GetVersion()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &PollingWaiter{
|
return &PollingBased{
|
||||||
polling: waiter,
|
polling: waiter,
|
||||||
version: v,
|
version: v,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait implements Waiter interface.
|
// Wait implements Waiter interface.
|
||||||
func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
func (w *PollingBased) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
|
||||||
if err != nil && !errIsAlreadyExists(err) {
|
if err != nil && !errIsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -160,7 +161,7 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitAny implements Waiter interface.
|
// WaitAny implements Waiter interface.
|
||||||
func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
|
||||||
var (
|
var (
|
||||||
currentHeight uint32
|
currentHeight uint32
|
||||||
failedAttempt int
|
failedAttempt int
|
||||||
|
@ -177,7 +178,7 @@ func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.
|
||||||
blockCount, err := w.polling.GetBlockCount()
|
blockCount, err := w.polling.GetBlockCount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failedAttempt++
|
failedAttempt++
|
||||||
if failedAttempt > PollingWaiterRetryCount {
|
if failedAttempt > PollingBasedRetryCount {
|
||||||
return nil, fmt.Errorf("failed to retrieve block count: %w", err)
|
return nil, fmt.Errorf("failed to retrieve block count: %w", err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -207,22 +208,22 @@ func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting.
|
// NewEventBased creates an instance of Waiter supporting websocket event-based transaction awaiting.
|
||||||
// EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based
|
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based
|
||||||
// awaiting fails.
|
// awaiting fails.
|
||||||
func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) {
|
func NewEventBased(waiter RPCEventBased) (*EventBased, error) {
|
||||||
polling, err := NewPollingWaiter(waiter)
|
polling, err := NewPollingBased(waiter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &EventWaiter{
|
return &EventBased{
|
||||||
ws: waiter,
|
ws: waiter,
|
||||||
polling: polling,
|
polling: polling,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait implements Waiter interface.
|
// Wait implements Waiter interface.
|
||||||
func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
|
func (w *EventBased) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
|
||||||
if err != nil && !errIsAlreadyExists(err) {
|
if err != nil && !errIsAlreadyExists(err) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -230,10 +231,11 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitAny implements Waiter interface.
|
// WaitAny implements Waiter interface.
|
||||||
func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
|
func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
|
||||||
var (
|
var (
|
||||||
wsWaitErr error
|
wsWaitErr error
|
||||||
waitersActive int
|
waitersActive int
|
||||||
|
hRcvr = make(chan *block.Header, 2)
|
||||||
bRcvr = make(chan *block.Block, 2)
|
bRcvr = make(chan *block.Block, 2)
|
||||||
aerRcvr = make(chan *state.AppExecResult, len(hashes))
|
aerRcvr = make(chan *state.AppExecResult, len(hashes))
|
||||||
unsubErrs = make(chan error)
|
unsubErrs = make(chan error)
|
||||||
|
@ -242,16 +244,22 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
|
|
||||||
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
|
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
|
||||||
since := vub
|
since := vub
|
||||||
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
|
blocksID, err := w.ws.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, hRcvr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
|
// Falling back to block-based subscription.
|
||||||
|
if errors.Is(err, neorpc.ErrInvalidParams) {
|
||||||
|
blocksID, err = w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks/headers: %w", err)
|
||||||
} else {
|
} else {
|
||||||
waitersActive++
|
waitersActive++
|
||||||
go func() {
|
go func() {
|
||||||
<-exit
|
<-exit
|
||||||
err = w.ws.Unsubscribe(blocksID)
|
err = w.ws.Unsubscribe(blocksID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err)
|
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks/headers (id: %s): %w", blocksID, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
unsubErrs <- nil
|
unsubErrs <- nil
|
||||||
|
@ -290,9 +298,20 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
|
|
||||||
if wsWaitErr == nil && res == nil {
|
if wsWaitErr == nil && res == nil {
|
||||||
select {
|
select {
|
||||||
|
case _, ok := <-hRcvr:
|
||||||
|
if !ok {
|
||||||
|
// We're toast, retry with non-ws client.
|
||||||
|
hRcvr = nil
|
||||||
|
bRcvr = nil
|
||||||
|
aerRcvr = nil
|
||||||
|
wsWaitErr = ErrMissedEvent
|
||||||
|
break
|
||||||
|
}
|
||||||
|
waitErr = ErrTxNotAccepted
|
||||||
case _, ok := <-bRcvr:
|
case _, ok := <-bRcvr:
|
||||||
if !ok {
|
if !ok {
|
||||||
// We're toast, retry with non-ws client.
|
// We're toast, retry with non-ws client.
|
||||||
|
hRcvr = nil
|
||||||
bRcvr = nil
|
bRcvr = nil
|
||||||
aerRcvr = nil
|
aerRcvr = nil
|
||||||
wsWaitErr = ErrMissedEvent
|
wsWaitErr = ErrMissedEvent
|
||||||
|
@ -302,6 +321,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
case aer, ok := <-aerRcvr:
|
case aer, ok := <-aerRcvr:
|
||||||
if !ok {
|
if !ok {
|
||||||
// We're toast, retry with non-ws client.
|
// We're toast, retry with non-ws client.
|
||||||
|
hRcvr = nil
|
||||||
bRcvr = nil
|
bRcvr = nil
|
||||||
aerRcvr = nil
|
aerRcvr = nil
|
||||||
wsWaitErr = ErrMissedEvent
|
wsWaitErr = ErrMissedEvent
|
||||||
|
@ -321,13 +341,21 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
drainLoop:
|
drainLoop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case _, ok := <-hRcvr:
|
||||||
|
if !ok { // Missed event means both channels are closed.
|
||||||
|
hRcvr = nil
|
||||||
|
bRcvr = nil
|
||||||
|
aerRcvr = nil
|
||||||
|
}
|
||||||
case _, ok := <-bRcvr:
|
case _, ok := <-bRcvr:
|
||||||
if !ok { // Missed event means both channels are closed.
|
if !ok { // Missed event means both channels are closed.
|
||||||
|
hRcvr = nil
|
||||||
bRcvr = nil
|
bRcvr = nil
|
||||||
aerRcvr = nil
|
aerRcvr = nil
|
||||||
}
|
}
|
||||||
case _, ok := <-aerRcvr:
|
case _, ok := <-aerRcvr:
|
||||||
if !ok { // Missed event means both channels are closed.
|
if !ok { // Missed event means both channels are closed.
|
||||||
|
hRcvr = nil
|
||||||
bRcvr = nil
|
bRcvr = nil
|
||||||
aerRcvr = nil
|
aerRcvr = nil
|
||||||
}
|
}
|
||||||
|
@ -349,6 +377,9 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if hRcvr != nil {
|
||||||
|
close(hRcvr)
|
||||||
|
}
|
||||||
if bRcvr != nil {
|
if bRcvr != nil {
|
||||||
close(bRcvr)
|
close(bRcvr)
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ type RPCClient struct {
|
||||||
context context.Context
|
context context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ = waiter.RPCPollingBased(&RPCClient{})
|
||||||
|
|
||||||
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
|
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
|
||||||
return r.invRes, r.err
|
return r.invRes, r.err
|
||||||
}
|
}
|
||||||
|
@ -80,11 +82,14 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r
|
||||||
type AwaitableRPCClient struct {
|
type AwaitableRPCClient struct {
|
||||||
RPCClient
|
RPCClient
|
||||||
|
|
||||||
chLock sync.RWMutex
|
chLock sync.RWMutex
|
||||||
subBlockCh chan<- *block.Block
|
subHeaderCh chan<- *block.Header
|
||||||
subTxCh chan<- *state.AppExecResult
|
subBlockCh chan<- *block.Block
|
||||||
|
subTxCh chan<- *state.AppExecResult
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ = waiter.RPCEventBased(&AwaitableRPCClient{})
|
||||||
|
|
||||||
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
|
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
|
||||||
c.chLock.Lock()
|
c.chLock.Lock()
|
||||||
defer c.chLock.Unlock()
|
defer c.chLock.Unlock()
|
||||||
|
@ -97,19 +102,25 @@ func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr
|
||||||
c.subTxCh = rcvr
|
c.subTxCh = rcvr
|
||||||
return "2", nil
|
return "2", nil
|
||||||
}
|
}
|
||||||
|
func (c *AwaitableRPCClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
|
||||||
|
c.chLock.Lock()
|
||||||
|
defer c.chLock.Unlock()
|
||||||
|
c.subHeaderCh = rcvr
|
||||||
|
return "3", nil
|
||||||
|
}
|
||||||
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
|
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
|
||||||
|
|
||||||
func TestNewWaiter(t *testing.T) {
|
func TestNewWaiter(t *testing.T) {
|
||||||
w := waiter.New((actor.RPCActor)(nil), nil)
|
w := waiter.New((actor.RPCActor)(nil), nil)
|
||||||
_, ok := w.(waiter.NullWaiter)
|
_, ok := w.(waiter.Null)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
w = waiter.New(&RPCClient{}, &result.Version{})
|
w = waiter.New(&RPCClient{}, &result.Version{})
|
||||||
_, ok = w.(*waiter.PollingWaiter)
|
_, ok = w.(*waiter.PollingBased)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
w = waiter.New(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{})
|
w = waiter.New(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{})
|
||||||
_, ok = w.(*waiter.EventWaiter)
|
_, ok = w.(*waiter.EventBased)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +132,7 @@ func TestPollingWaiter_Wait(t *testing.T) {
|
||||||
c := &RPCClient{appLog: appLog}
|
c := &RPCClient{appLog: appLog}
|
||||||
c.bCount.Store(bCount)
|
c.bCount.Store(bCount)
|
||||||
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
||||||
_, ok := w.(*waiter.PollingWaiter)
|
_, ok := w.(*waiter.PollingBased)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
// Wait with error.
|
// Wait with error.
|
||||||
|
@ -186,7 +197,7 @@ func TestWSWaiter_Wait(t *testing.T) {
|
||||||
c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}}
|
c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}}
|
||||||
c.bCount.Store(bCount)
|
c.bCount.Store(bCount)
|
||||||
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time.
|
||||||
_, ok := w.(*waiter.EventWaiter)
|
_, ok := w.(*waiter.EventBased)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
// Wait with error.
|
// Wait with error.
|
||||||
|
@ -244,12 +255,12 @@ func TestWSWaiter_Wait(t *testing.T) {
|
||||||
check(t, func() {
|
check(t, func() {
|
||||||
c.chLock.RLock()
|
c.chLock.RLock()
|
||||||
defer c.chLock.RUnlock()
|
defer c.chLock.RUnlock()
|
||||||
c.subBlockCh <- &block.Block{}
|
c.subHeaderCh <- &block.Header{}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRPCWaiterRPCClientCompat(t *testing.T) {
|
func TestRPCWaiterRPCClientCompat(t *testing.T) {
|
||||||
_ = waiter.RPCPollingWaiter(&rpcclient.Client{})
|
_ = waiter.RPCPollingBased(&rpcclient.Client{})
|
||||||
_ = waiter.RPCPollingWaiter(&rpcclient.WSClient{})
|
_ = waiter.RPCPollingBased(&rpcclient.WSClient{})
|
||||||
_ = waiter.RPCEventWaiter(&rpcclient.WSClient{})
|
_ = waiter.RPCEventBased(&rpcclient.WSClient{})
|
||||||
}
|
}
|
||||||
|
|
|
@ -1789,53 +1789,84 @@ func TestClient_Wait(t *testing.T) {
|
||||||
defer chain.Close()
|
defer chain.Close()
|
||||||
defer rpcSrv.Shutdown()
|
defer rpcSrv.Shutdown()
|
||||||
|
|
||||||
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
|
run := func(t *testing.T, ws bool) {
|
||||||
require.NoError(t, err)
|
acc, err := wallet.NewAccount()
|
||||||
acc, err := wallet.NewAccount()
|
require.NoError(t, err)
|
||||||
require.NoError(t, err)
|
|
||||||
act, err := actor.New(c, []actor.SignerAccount{
|
|
||||||
{
|
|
||||||
Signer: transaction.Signer{
|
|
||||||
Account: acc.ScriptHash(),
|
|
||||||
},
|
|
||||||
Account: acc,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
b, err := chain.GetBlock(chain.GetHeaderHash(1))
|
var act *actor.Actor
|
||||||
require.NoError(t, err)
|
if ws {
|
||||||
require.True(t, len(b.Transactions) > 0)
|
c, err := rpcclient.NewWS(context.Background(), "ws"+strings.TrimPrefix(httpSrv.URL, "http")+"/ws", rpcclient.WSOptions{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, c.Init())
|
||||||
|
act, err = actor.New(c, []actor.SignerAccount{
|
||||||
|
{
|
||||||
|
Signer: transaction.Signer{
|
||||||
|
Account: acc.ScriptHash(),
|
||||||
|
},
|
||||||
|
Account: acc,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, c.Init())
|
||||||
|
act, err = actor.New(c, []actor.SignerAccount{
|
||||||
|
{
|
||||||
|
Signer: transaction.Signer{
|
||||||
|
Account: acc.ScriptHash(),
|
||||||
|
},
|
||||||
|
Account: acc,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
|
b, err := chain.GetBlock(chain.GetHeaderHash(1))
|
||||||
rcvr := make(chan struct{})
|
require.NoError(t, err)
|
||||||
go func() {
|
require.True(t, len(b.Transactions) > 0)
|
||||||
aer, err := act.Wait(h, vub, nil)
|
|
||||||
if errExpected {
|
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
|
||||||
require.Error(t, err)
|
rcvr := make(chan struct{})
|
||||||
} else {
|
go func() {
|
||||||
require.NoError(t, err)
|
aer, err := act.Wait(h, vub, nil)
|
||||||
require.Equal(t, h, aer.Container)
|
if errExpected {
|
||||||
}
|
require.Error(t, err)
|
||||||
rcvr <- struct{}{}
|
} else {
|
||||||
}()
|
require.NoError(t, err)
|
||||||
waitloop:
|
require.Equal(t, h, aer.Container)
|
||||||
for {
|
}
|
||||||
select {
|
rcvr <- struct{}{}
|
||||||
case <-rcvr:
|
}()
|
||||||
break waitloop
|
waitloop:
|
||||||
case <-time.NewTimer(chain.GetConfig().TimePerBlock).C:
|
for {
|
||||||
t.Fatal("transaction failed to be awaited")
|
select {
|
||||||
|
case <-rcvr:
|
||||||
|
break waitloop
|
||||||
|
case <-time.NewTimer(chain.GetConfig().TimePerBlock).C:
|
||||||
|
t.Fatal("transaction failed to be awaited")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for transaction that has been persisted and VUB block has been persisted.
|
||||||
|
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false)
|
||||||
|
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted.
|
||||||
|
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false)
|
||||||
|
if !ws {
|
||||||
|
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
|
||||||
|
// WS client waits for the next block to be accepted to ensure that transaction wasn't
|
||||||
|
// persisted, and this test doesn't run chain, thus, don't run this test for WS client.
|
||||||
|
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for transaction that has been persisted and VUB block has been persisted.
|
t.Run("client", func(t *testing.T) {
|
||||||
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false)
|
run(t, false)
|
||||||
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted.
|
})
|
||||||
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false)
|
t.Run("ws client", func(t *testing.T) {
|
||||||
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
|
run(t, true)
|
||||||
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {
|
func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {
|
||||||
|
|
Loading…
Reference in a new issue