From 5b81cb065f0df6cc46f1cc7e3f3bc96ee8b6470d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 19 Oct 2022 11:55:39 +0300 Subject: [PATCH] rpc: refactor waiter-related actor code --- pkg/rpcclient/actor/actor.go | 16 +++- pkg/rpcclient/actor/waiter.go | 135 ++++++++++++++++++++++++++-------- 2 files changed, 119 insertions(+), 32 deletions(-) diff --git a/pkg/rpcclient/actor/actor.go b/pkg/rpcclient/actor/actor.go index cab05e218..c6ee86f92 100644 --- a/pkg/rpcclient/actor/actor.go +++ b/pkg/rpcclient/actor/actor.go @@ -25,7 +25,6 @@ import ( // create and send transactions. type RPCActor interface { invoker.RPCInvoke - RPCPollingWaiter CalculateNetworkFee(tx *transaction.Transaction) (int64, error) GetBlockCount() (uint32, error) @@ -54,8 +53,22 @@ type SignerAccount struct { // action to be performed, "Make" prefix is used for methods that create // transactions in various ways, while "Send" prefix is used by methods that // directly transmit created transactions to the RPC server. +// +// Actor also provides a Waiter interface to wait until transaction will be +// accepted to the chain. Depending on the underlying RPCActor functionality, +// transaction awaiting can be performed via web-socket using RPC notifications +// subsystem with EventWaiter, via regular RPC requests using a poll-based +// algorithm with PollingWaiter or can not be performed if RPCActor doesn't +// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with +// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the +// transaction in the latter case. Waiter uses context of the underlying RPCActor +// and interrupts transaction awaiting process if the context is done. +// ErrContextDone wrapped with the context's error will be returned in this case. +// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance +// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment. type Actor struct { invoker.Invoker + Waiter client RPCActor opts Options @@ -109,6 +122,7 @@ func New(ra RPCActor, signers []SignerAccount) (*Actor, error) { } return &Actor{ Invoker: *inv, + Waiter: newWaiter(ra, version), client: ra, opts: NewDefaultOptions(), signers: signers, diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 5c19ebccd..724f9b734 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -27,20 +27,34 @@ var ( // ErrContextDone is returned when Waiter context has been done in the middle // of transaction awaiting process and no result was received yet. ErrContextDone = errors.New("waiter context done") + // ErrAwaitingNotSupported is returned from Wait method if Waiter instance + // doesn't support transaction awaiting. + ErrAwaitingNotSupported = errors.New("awaiting not supported") ) type ( + // Waiter is an interface providing transaction awaiting functionality to Actor. + Waiter interface { + // Wait allows to wait until transaction will be accepted to the chain. It can be + // used as a wrapper for Send or SignAndSend and accepts transaction hash, + // ValidUntilBlock value and an error. It returns transaction execution result + // or an error if transaction wasn't accepted to the chain. + Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) + } // RPCPollingWaiter is an interface that enables transaction awaiting functionality // for Actor instance based on periodical BlockCount and ApplicationLog polls. RPCPollingWaiter interface { // Context should return the RPC client context to be able to gracefully // shut down all running processes (if so). Context() context.Context + GetVersion() (*result.Version, error) GetBlockCount() (uint32, error) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) } // RPCEventWaiter is an interface that enables improved transaction awaiting functionality - // for Actor instance based on web-socket Block and ApplicationLog notifications. + // for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter + // contains RPCPollingWaiter under the hood and falls back to polling when subscription-based + // awaiting fails. RPCEventWaiter interface { RPCPollingWaiter @@ -50,28 +64,73 @@ type ( } ) -// Wait allows to wait until transaction will be accepted to the chain. It can be -// used as a wrapper for Send or SignAndSend and accepts transaction hash, -// ValidUntilBlock value and an error. It returns transaction execution result -// or an error if transaction wasn't accepted to the chain. -func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { +// NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality. +type NullWaiter struct{} + +// PollingWaiter is a polling-based Waiter. +type PollingWaiter struct { + polling RPCPollingWaiter + version *result.Version +} + +// EventWaiter is a websocket-based Waiter. +type EventWaiter struct { + ws RPCEventWaiter + polling Waiter +} + +// newWaiter creates Waiter instance. It can be either websocket-based or +// polling-base, otherwise Waiter stub is returned. +func newWaiter(ra RPCActor, v *result.Version) Waiter { + if eventW, ok := ra.(RPCEventWaiter); ok { + return &EventWaiter{ + ws: eventW, + polling: &PollingWaiter{ + polling: eventW, + version: v, + }, + } + } + if pollW, ok := ra.(RPCPollingWaiter); ok { + return &PollingWaiter{ + polling: pollW, + version: v, + } + } + return NewNullWaiter() +} + +// NewNullWaiter creates an instance of Waiter stub. +func NewNullWaiter() NullWaiter { + return NullWaiter{} +} + +// Wait implements Waiter interface. +func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + return nil, ErrAwaitingNotSupported +} + +// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. +func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { + v, err := waiter.GetVersion() if err != nil { return nil, err } - if wsW, ok := a.client.(RPCEventWaiter); ok { - return a.waitWithWSWaiter(wsW, h, vub) - } - return a.waitWithSimpleWaiter(a.client, h, vub) + return &PollingWaiter{ + polling: waiter, + version: v, + }, nil } -// waitWithSimpleWaiter waits until transaction is accepted to the chain and -// returns its execution result or an error if it's missing from chain after -// VUB block. -func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) { +// Wait implements Waiter interface. +func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + if err != nil { + return nil, err + } var ( currentHeight uint32 failedAttempt int - pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2 + pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2 ) if pollTime == 0 { pollTime = time.Second @@ -81,7 +140,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin for { select { case <-timer.C: - blockCount, err := c.GetBlockCount() + blockCount, err := w.polling.GetBlockCount() if err != nil { failedAttempt++ if failedAttempt > PollingWaiterRetryCount { @@ -94,7 +153,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin currentHeight = blockCount - 1 } t := trigger.Application - res, err := c.GetApplicationLog(h, &t) + res, err := w.polling.GetApplicationLog(h, &t) if err == nil { return &state.AppExecResult{ Container: h, @@ -104,21 +163,35 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin if currentHeight >= vub { return nil, ErrTxNotAccepted } - - case <-c.Context().Done(): - return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + case <-w.polling.Context().Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err()) } } } -// waitWithWSWaiter waits until transaction is accepted to the chain and returns -// its execution result or an error if it's missing from chain after VUB block. -// It uses optimized web-socket waiter if possible. -func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) { +// NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting. +// EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based +// awaiting fails. +func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) { + polling, err := NewPollingWaiter(waiter) + if err != nil { + return nil, err + } + return &EventWaiter{ + ws: waiter, + polling: polling, + }, nil +} + +// Wait implements Waiter interface. +func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) { + if err != nil { + return nil, err + } var wsWaitErr error defer func() { if wsWaitErr != nil { - res, waitErr = a.waitWithSimpleWaiter(c, h, vub) + res, waitErr = w.polling.Wait(h, vub, nil) if waitErr != nil { waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) } @@ -139,13 +212,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) + blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return } defer func() { - err = c.Unsubscribe(blocksID) + err = w.ws.Unsubscribe(blocksID) if err != nil { errFmt := "failed to unsubscribe from blocks (id: %s): %v" errArgs := []interface{}{blocksID, err} @@ -156,13 +229,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) + txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return } defer func() { - err = c.Unsubscribe(txsID) + err = w.ws.Unsubscribe(txsID) if err != nil { errFmt := "failed to unsubscribe from transactions (id: %s): %v" errArgs := []interface{}{txsID, err} @@ -189,8 +262,8 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( wsWaitErr = errors.New("some event was missed") return } - case <-c.Context().Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + case <-w.ws.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) return } }