2023-12-26 10:04:45 +00:00
package waiter
2022-10-12 12:23:32 +00:00
import (
"context"
"errors"
"fmt"
2022-11-22 14:12:24 +00:00
"strings"
2022-10-12 12:23:32 +00:00
"time"
2022-10-25 12:11:24 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/block"
2022-10-12 12:23:32 +00:00
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
2023-12-29 12:15:18 +00:00
// PollingBasedRetryCount is a threshold for a number of subsequent failed
// attempts to get block count from the RPC server for PollingBased. If it fails
// to retrieve block count PollingBasedRetryCount times in a raw then transaction
2022-10-12 12:23:32 +00:00
// awaiting attempt considered to be failed and an error is returned.
2023-12-29 12:15:18 +00:00
const PollingBasedRetryCount = 3
2022-10-12 12:23:32 +00:00
var (
// ErrTxNotAccepted is returned when transaction wasn't accepted to the chain
// even after ValidUntilBlock block persist.
ErrTxNotAccepted = errors . New ( "transaction was not accepted to chain" )
// ErrContextDone is returned when Waiter context has been done in the middle
// of transaction awaiting process and no result was received yet.
ErrContextDone = errors . New ( "waiter context done" )
2022-10-19 08:55:39 +00:00
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
// doesn't support transaction awaiting.
ErrAwaitingNotSupported = errors . New ( "awaiting not supported" )
2023-12-29 12:15:18 +00:00
// ErrMissedEvent is returned when RPCEventBased closes receiver channel
2022-10-25 12:11:24 +00:00
// which happens if missed event was received from the RPC server.
ErrMissedEvent = errors . New ( "some event was missed" )
2022-10-12 12:23:32 +00:00
)
type (
2023-12-29 12:15:18 +00:00
// Waiter is an interface providing transaction awaiting functionality.
2022-10-19 08:55:39 +00:00
Waiter interface {
// Wait allows to wait until transaction will be accepted to the chain. It can be
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
// ValidUntilBlock value and an error. It returns transaction execution result
2022-11-22 14:12:24 +00:00
// or an error if transaction wasn't accepted to the chain. Notice that "already
// exists" err value is not treated as an error by this routine because it
// means that the transactions given might be already accepted or soon going
// to be accepted. Such transaction can be waited for in a usual way, potentially
// with positive result, so that's what will happen.
2022-10-19 08:55:39 +00:00
Wait ( h util . Uint256 , vub uint32 , err error ) ( * state . AppExecResult , error )
2022-10-21 08:35:10 +00:00
// WaitAny waits until at least one of the specified transactions will be accepted
// to the chain until vub (including). It returns execution result of this
// transaction or an error if none of the transactions was accepted to the chain.
2023-12-29 12:15:18 +00:00
// It uses underlying RPCPollingBased or RPCEventBased context to interrupt
2022-10-21 08:35:10 +00:00
// awaiting process, but additional ctx can be passed as an argument for the same
// purpose.
WaitAny ( ctx context . Context , vub uint32 , hashes ... util . Uint256 ) ( * state . AppExecResult , error )
2022-10-19 08:55:39 +00:00
}
2023-12-29 12:15:18 +00:00
// RPCPollingBased is an interface that enables transaction awaiting functionality
// based on periodical BlockCount and ApplicationLog polls.
RPCPollingBased interface {
2022-10-12 12:23:32 +00:00
// Context should return the RPC client context to be able to gracefully
// shut down all running processes (if so).
Context ( ) context . Context
2022-10-19 08:55:39 +00:00
GetVersion ( ) ( * result . Version , error )
2022-10-12 12:23:32 +00:00
GetBlockCount ( ) ( uint32 , error )
GetApplicationLog ( hash util . Uint256 , trig * trigger . Type ) ( * result . ApplicationLog , error )
}
2023-12-29 12:15:18 +00:00
// RPCEventBased is an interface that enables improved transaction awaiting functionality
// based on web-socket Block and ApplicationLog notifications. RPCEventBased
// contains RPCPollingBased under the hood and falls back to polling when subscription-based
2022-10-19 08:55:39 +00:00
// awaiting fails.
2023-12-29 12:15:18 +00:00
RPCEventBased interface {
RPCPollingBased
2022-10-12 12:23:32 +00:00
2023-12-29 11:33:46 +00:00
ReceiveHeadersOfAddedBlocks ( flt * neorpc . BlockFilter , rcvr chan <- * block . Header ) ( string , error )
2022-10-25 12:11:24 +00:00
ReceiveBlocks ( flt * neorpc . BlockFilter , rcvr chan <- * block . Block ) ( string , error )
ReceiveExecutions ( flt * neorpc . ExecutionFilter , rcvr chan <- * state . AppExecResult ) ( string , error )
2022-10-12 12:23:32 +00:00
Unsubscribe ( id string ) error
}
)
2023-12-29 12:15:18 +00:00
// Null is a Waiter stub that doesn't support transaction awaiting functionality.
type Null struct { }
2022-10-19 08:55:39 +00:00
2023-12-29 12:15:18 +00:00
// PollingBased is a polling-based Waiter.
type PollingBased struct {
polling RPCPollingBased
2022-10-19 08:55:39 +00:00
version * result . Version
}
2023-12-29 12:15:18 +00:00
// EventBased is a websocket-based Waiter.
type EventBased struct {
ws RPCEventBased
2022-10-19 08:55:39 +00:00
polling Waiter
}
2022-11-22 14:12:24 +00:00
// errIsAlreadyExists is a temporary helper until we have #2248 solved. Both C#
// and Go nodes return this string (possibly among other data).
func errIsAlreadyExists ( err error ) bool {
return strings . Contains ( strings . ToLower ( err . Error ( ) ) , "already exists" )
}
2023-12-26 10:04:45 +00:00
// New creates Waiter instance. It can be either websocket-based or
2023-12-18 19:36:53 +00:00
// polling-base, otherwise Waiter stub is returned. As a first argument
2023-12-29 12:15:18 +00:00
// it accepts RPCEventBased implementation, RPCPollingBased implementation
2023-12-18 19:36:53 +00:00
// or not an implementation of these two interfaces. It returns websocket-based
// waiter, polling-based waiter or a stub correspondingly.
2023-12-26 10:04:45 +00:00
func New ( base any , v * result . Version ) Waiter {
2023-12-29 12:15:18 +00:00
if eventW , ok := base . ( RPCEventBased ) ; ok {
return & EventBased {
2022-10-19 08:55:39 +00:00
ws : eventW ,
2023-12-29 12:15:18 +00:00
polling : & PollingBased {
2022-10-19 08:55:39 +00:00
polling : eventW ,
version : v ,
} ,
}
}
2023-12-29 12:15:18 +00:00
if pollW , ok := base . ( RPCPollingBased ) ; ok {
return & PollingBased {
2022-10-19 08:55:39 +00:00
polling : pollW ,
version : v ,
}
}
2023-12-29 12:15:18 +00:00
return NewNull ( )
2022-10-19 08:55:39 +00:00
}
2023-12-29 12:15:18 +00:00
// NewNull creates an instance of Waiter stub.
func NewNull ( ) Null {
return Null { }
2022-10-19 08:55:39 +00:00
}
// Wait implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( Null ) Wait ( h util . Uint256 , vub uint32 , err error ) ( * state . AppExecResult , error ) {
2022-10-19 08:55:39 +00:00
return nil , ErrAwaitingNotSupported
}
2022-10-21 08:35:10 +00:00
// WaitAny implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( Null ) WaitAny ( ctx context . Context , vub uint32 , hashes ... util . Uint256 ) ( * state . AppExecResult , error ) {
2022-10-21 08:35:10 +00:00
return nil , ErrAwaitingNotSupported
}
2023-12-29 12:15:18 +00:00
// NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingBased ( waiter RPCPollingBased ) ( * PollingBased , error ) {
2022-10-19 08:55:39 +00:00
v , err := waiter . GetVersion ( )
2022-10-12 12:23:32 +00:00
if err != nil {
return nil , err
}
2023-12-29 12:15:18 +00:00
return & PollingBased {
2022-10-19 08:55:39 +00:00
polling : waiter ,
version : v ,
} , nil
2022-10-12 12:23:32 +00:00
}
2022-10-19 08:55:39 +00:00
// Wait implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( w * PollingBased ) Wait ( h util . Uint256 , vub uint32 , err error ) ( * state . AppExecResult , error ) {
2022-11-22 14:12:24 +00:00
if err != nil && ! errIsAlreadyExists ( err ) {
2022-10-19 08:55:39 +00:00
return nil , err
}
2022-10-21 08:35:10 +00:00
return w . WaitAny ( context . TODO ( ) , vub , h )
}
// WaitAny implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( w * PollingBased ) WaitAny ( ctx context . Context , vub uint32 , hashes ... util . Uint256 ) ( * state . AppExecResult , error ) {
2022-10-12 12:23:32 +00:00
var (
currentHeight uint32
failedAttempt int
2022-10-19 08:55:39 +00:00
pollTime = time . Millisecond * time . Duration ( w . version . Protocol . MillisecondsPerBlock ) / 2
2022-10-12 12:23:32 +00:00
)
if pollTime == 0 {
pollTime = time . Second
}
timer := time . NewTicker ( pollTime )
defer timer . Stop ( )
for {
select {
case <- timer . C :
2022-10-19 08:55:39 +00:00
blockCount , err := w . polling . GetBlockCount ( )
2022-10-12 12:23:32 +00:00
if err != nil {
failedAttempt ++
2023-12-29 12:15:18 +00:00
if failedAttempt > PollingBasedRetryCount {
2022-10-12 12:23:32 +00:00
return nil , fmt . Errorf ( "failed to retrieve block count: %w" , err )
}
continue
}
failedAttempt = 0
if blockCount - 1 > currentHeight {
currentHeight = blockCount - 1
}
t := trigger . Application
2022-10-21 08:35:10 +00:00
for _ , h := range hashes {
res , err := w . polling . GetApplicationLog ( h , & t )
if err == nil {
return & state . AppExecResult {
Container : res . Container ,
Execution : res . Executions [ 0 ] ,
} , nil
}
2022-10-12 12:23:32 +00:00
}
if currentHeight >= vub {
return nil , ErrTxNotAccepted
}
2022-10-19 08:55:39 +00:00
case <- w . polling . Context ( ) . Done ( ) :
2023-03-15 12:47:38 +00:00
return nil , fmt . Errorf ( "%w: %v" , ErrContextDone , w . polling . Context ( ) . Err ( ) ) //nolint:errorlint // errorlint: non-wrapping format verb for fmt.Errorf. Use `%w` to format errors
2022-10-21 08:35:10 +00:00
case <- ctx . Done ( ) :
2023-03-15 12:47:38 +00:00
return nil , fmt . Errorf ( "%w: %v" , ErrContextDone , ctx . Err ( ) ) //nolint:errorlint // errorlint: non-wrapping format verb for fmt.Errorf. Use `%w` to format errors
2022-10-12 12:23:32 +00:00
}
}
}
2023-12-29 12:15:18 +00:00
// NewEventBased creates an instance of Waiter supporting websocket event-based transaction awaiting.
// EventBased contains PollingBased under the hood and falls back to polling when subscription-based
2022-10-19 08:55:39 +00:00
// awaiting fails.
2023-12-29 12:15:18 +00:00
func NewEventBased ( waiter RPCEventBased ) ( * EventBased , error ) {
polling , err := NewPollingBased ( waiter )
2022-10-19 08:55:39 +00:00
if err != nil {
return nil , err
}
2023-12-29 12:15:18 +00:00
return & EventBased {
2022-10-19 08:55:39 +00:00
ws : waiter ,
polling : polling ,
} , nil
}
// Wait implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( w * EventBased ) Wait ( h util . Uint256 , vub uint32 , err error ) ( res * state . AppExecResult , waitErr error ) {
2022-11-22 14:12:24 +00:00
if err != nil && ! errIsAlreadyExists ( err ) {
2022-10-19 08:55:39 +00:00
return nil , err
}
2022-10-21 08:35:10 +00:00
return w . WaitAny ( context . TODO ( ) , vub , h )
}
// WaitAny implements Waiter interface.
2023-12-29 12:15:18 +00:00
func ( w * EventBased ) WaitAny ( ctx context . Context , vub uint32 , hashes ... util . Uint256 ) ( res * state . AppExecResult , waitErr error ) {
2022-11-16 09:35:26 +00:00
var (
wsWaitErr error
waitersActive int
2023-12-29 11:33:46 +00:00
hRcvr = make ( chan * block . Header , 2 )
2022-11-16 20:01:01 +00:00
bRcvr = make ( chan * block . Block , 2 )
aerRcvr = make ( chan * state . AppExecResult , len ( hashes ) )
unsubErrs = make ( chan error )
exit = make ( chan struct { } )
2022-11-16 09:35:26 +00:00
)
2022-11-16 20:01:01 +00:00
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
since := vub
2023-12-29 11:33:46 +00:00
blocksID , err := w . ws . ReceiveHeadersOfAddedBlocks ( & neorpc . BlockFilter { Since : & since } , hRcvr )
2022-11-16 20:01:01 +00:00
if err != nil {
2023-12-29 11:33:46 +00:00
// Falling back to block-based subscription.
if errors . Is ( err , neorpc . ErrInvalidParams ) {
blocksID , err = w . ws . ReceiveBlocks ( & neorpc . BlockFilter { Since : & since } , bRcvr )
}
}
if err != nil {
wsWaitErr = fmt . Errorf ( "failed to subscribe for new blocks/headers: %w" , err )
2022-11-16 20:01:01 +00:00
} else {
waitersActive ++
go func ( ) {
<- exit
err = w . ws . Unsubscribe ( blocksID )
if err != nil {
2023-12-29 11:33:46 +00:00
unsubErrs <- fmt . Errorf ( "failed to unsubscribe from blocks/headers (id: %s): %w" , blocksID , err )
2022-11-16 20:01:01 +00:00
return
}
unsubErrs <- nil
} ( )
}
if wsWaitErr == nil {
2022-11-22 13:41:22 +00:00
trig := trigger . Application
2022-11-16 20:01:01 +00:00
for _ , h := range hashes {
txsID , err := w . ws . ReceiveExecutions ( & neorpc . ExecutionFilter { Container : & h } , aerRcvr )
if err != nil {
wsWaitErr = fmt . Errorf ( "failed to subscribe for execution results: %w" , err )
break
2022-10-12 12:23:32 +00:00
}
2022-11-16 20:01:01 +00:00
waitersActive ++
go func ( ) {
<- exit
err = w . ws . Unsubscribe ( txsID )
if err != nil {
unsubErrs <- fmt . Errorf ( "failed to unsubscribe from transactions (id: %s): %w" , txsID , err )
return
}
unsubErrs <- nil
} ( )
2022-11-22 13:41:22 +00:00
// There is a potential race between subscription and acceptance, so
// do a polling check once _after_ the subscription.
appLog , err := w . ws . GetApplicationLog ( h , & trig )
if err == nil {
res = & state . AppExecResult {
Container : appLog . Container ,
Execution : appLog . Executions [ 0 ] ,
}
break // We have the result, no need for other subscriptions.
}
2022-10-12 12:23:32 +00:00
}
2022-11-16 20:01:01 +00:00
}
2022-11-16 09:35:26 +00:00
2022-11-22 13:41:22 +00:00
if wsWaitErr == nil && res == nil {
2022-11-16 20:01:01 +00:00
select {
2023-12-29 11:33:46 +00:00
case _ , ok := <- hRcvr :
if ! ok {
// We're toast, retry with non-ws client.
hRcvr = nil
bRcvr = nil
aerRcvr = nil
wsWaitErr = ErrMissedEvent
break
}
waitErr = ErrTxNotAccepted
2022-11-22 13:41:22 +00:00
case _ , ok := <- bRcvr :
2022-11-16 20:01:01 +00:00
if ! ok {
// We're toast, retry with non-ws client.
2023-12-29 11:33:46 +00:00
hRcvr = nil
2023-03-17 06:57:41 +00:00
bRcvr = nil
aerRcvr = nil
2022-11-16 20:01:01 +00:00
wsWaitErr = ErrMissedEvent
break
}
2022-11-22 13:41:22 +00:00
waitErr = ErrTxNotAccepted
2022-11-16 20:01:01 +00:00
case aer , ok := <- aerRcvr :
if ! ok {
// We're toast, retry with non-ws client.
2023-12-29 11:33:46 +00:00
hRcvr = nil
2023-03-17 06:57:41 +00:00
bRcvr = nil
aerRcvr = nil
2022-11-16 20:01:01 +00:00
wsWaitErr = ErrMissedEvent
break
}
res = aer
case <- w . ws . Context ( ) . Done ( ) :
2023-03-15 12:47:38 +00:00
waitErr = fmt . Errorf ( "%w: %v" , ErrContextDone , w . ws . Context ( ) . Err ( ) ) //nolint:errorlint // errorlint: non-wrapping format verb for fmt.Errorf. Use `%w` to format errors
2022-11-16 20:01:01 +00:00
case <- ctx . Done ( ) :
2023-03-15 12:47:38 +00:00
waitErr = fmt . Errorf ( "%w: %v" , ErrContextDone , ctx . Err ( ) ) //nolint:errorlint // errorlint: non-wrapping format verb for fmt.Errorf. Use `%w` to format errors
2022-11-16 20:01:01 +00:00
}
}
close ( exit )
if waitersActive > 0 {
// Drain receivers to avoid other notification receivers blocking.
2022-10-12 12:23:32 +00:00
drainLoop :
for {
select {
2023-12-29 11:33:46 +00:00
case _ , ok := <- hRcvr :
if ! ok { // Missed event means both channels are closed.
hRcvr = nil
bRcvr = nil
aerRcvr = nil
}
2023-03-17 06:57:41 +00:00
case _ , ok := <- bRcvr :
if ! ok { // Missed event means both channels are closed.
2023-12-29 11:33:46 +00:00
hRcvr = nil
2023-03-17 06:57:41 +00:00
bRcvr = nil
aerRcvr = nil
}
case _ , ok := <- aerRcvr :
if ! ok { // Missed event means both channels are closed.
2023-12-29 11:33:46 +00:00
hRcvr = nil
2023-03-17 06:57:41 +00:00
bRcvr = nil
aerRcvr = nil
}
2022-11-16 09:35:26 +00:00
case unsubErr := <- unsubErrs :
if unsubErr != nil {
errFmt := "unsubscription error: %v"
2023-04-03 10:34:24 +00:00
errArgs := [ ] any { unsubErr }
2022-11-16 09:35:26 +00:00
if waitErr != nil {
errFmt = "%w; " + errFmt
2023-04-03 10:34:24 +00:00
errArgs = append ( [ ] any { waitErr } , errArgs ... )
2022-11-16 09:35:26 +00:00
}
waitErr = fmt . Errorf ( errFmt , errArgs ... )
}
waitersActive --
// Wait until all receiver channels finish their work.
if waitersActive == 0 {
break drainLoop
}
2022-10-12 12:23:32 +00:00
}
}
}
2023-12-29 11:33:46 +00:00
if hRcvr != nil {
close ( hRcvr )
}
2023-03-17 06:57:41 +00:00
if bRcvr != nil {
2022-11-16 20:01:01 +00:00
close ( bRcvr )
2023-03-17 06:57:41 +00:00
}
if aerRcvr != nil {
2022-11-16 20:01:01 +00:00
close ( aerRcvr )
2022-10-21 08:35:10 +00:00
}
2022-11-16 20:01:01 +00:00
close ( unsubErrs )
2022-10-12 12:23:32 +00:00
2022-11-16 20:01:01 +00:00
// Rollback to a poll-based waiter if needed.
if wsWaitErr != nil && waitErr == nil {
res , waitErr = w . polling . WaitAny ( ctx , vub , hashes ... )
if waitErr != nil {
// Wrap the poll-based error, it's more important.
2023-03-15 12:47:38 +00:00
waitErr = fmt . Errorf ( "event-based error: %v; poll-based waiter error: %w" , wsWaitErr , waitErr ) //nolint:errorlint // errorlint: non-wrapping format verb for fmt.Errorf. Use `%w` to format errors
2022-10-12 12:23:32 +00:00
}
}
2022-10-25 12:11:24 +00:00
return
2022-10-12 12:23:32 +00:00
}