rpc: refactor waiter-related actor code

This commit is contained in:
Anna Shaleva 2022-10-19 11:55:39 +03:00
parent 8e84bb51d5
commit 5b81cb065f
2 changed files with 119 additions and 32 deletions

View file

@ -25,7 +25,6 @@ import (
// create and send transactions. // create and send transactions.
type RPCActor interface { type RPCActor interface {
invoker.RPCInvoke invoker.RPCInvoke
RPCPollingWaiter
CalculateNetworkFee(tx *transaction.Transaction) (int64, error) CalculateNetworkFee(tx *transaction.Transaction) (int64, error)
GetBlockCount() (uint32, error) GetBlockCount() (uint32, error)
@ -54,8 +53,22 @@ type SignerAccount struct {
// action to be performed, "Make" prefix is used for methods that create // action to be performed, "Make" prefix is used for methods that create
// transactions in various ways, while "Send" prefix is used by methods that // transactions in various ways, while "Send" prefix is used by methods that
// directly transmit created transactions to the RPC server. // directly transmit created transactions to the RPC server.
//
// Actor also provides a Waiter interface to wait until transaction will be
// accepted to the chain. Depending on the underlying RPCActor functionality,
// transaction awaiting can be performed via web-socket using RPC notifications
// subsystem with EventWaiter, via regular RPC requests using a poll-based
// algorithm with PollingWaiter or can not be performed if RPCActor doesn't
// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with
// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the
// transaction in the latter case. Waiter uses context of the underlying RPCActor
// and interrupts transaction awaiting process if the context is done.
// ErrContextDone wrapped with the context's error will be returned in this case.
// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance
// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment.
type Actor struct { type Actor struct {
invoker.Invoker invoker.Invoker
Waiter
client RPCActor client RPCActor
opts Options opts Options
@ -109,6 +122,7 @@ func New(ra RPCActor, signers []SignerAccount) (*Actor, error) {
} }
return &Actor{ return &Actor{
Invoker: *inv, Invoker: *inv,
Waiter: newWaiter(ra, version),
client: ra, client: ra,
opts: NewDefaultOptions(), opts: NewDefaultOptions(),
signers: signers, signers: signers,

View file

@ -27,20 +27,34 @@ var (
// ErrContextDone is returned when Waiter context has been done in the middle // ErrContextDone is returned when Waiter context has been done in the middle
// of transaction awaiting process and no result was received yet. // of transaction awaiting process and no result was received yet.
ErrContextDone = errors.New("waiter context done") ErrContextDone = errors.New("waiter context done")
// ErrAwaitingNotSupported is returned from Wait method if Waiter instance
// doesn't support transaction awaiting.
ErrAwaitingNotSupported = errors.New("awaiting not supported")
) )
type ( type (
// Waiter is an interface providing transaction awaiting functionality to Actor.
Waiter interface {
// Wait allows to wait until transaction will be accepted to the chain. It can be
// used as a wrapper for Send or SignAndSend and accepts transaction hash,
// ValidUntilBlock value and an error. It returns transaction execution result
// or an error if transaction wasn't accepted to the chain.
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
}
// RPCPollingWaiter is an interface that enables transaction awaiting functionality // RPCPollingWaiter is an interface that enables transaction awaiting functionality
// for Actor instance based on periodical BlockCount and ApplicationLog polls. // for Actor instance based on periodical BlockCount and ApplicationLog polls.
RPCPollingWaiter interface { RPCPollingWaiter interface {
// Context should return the RPC client context to be able to gracefully // Context should return the RPC client context to be able to gracefully
// shut down all running processes (if so). // shut down all running processes (if so).
Context() context.Context Context() context.Context
GetVersion() (*result.Version, error)
GetBlockCount() (uint32, error) GetBlockCount() (uint32, error)
GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error)
} }
// RPCEventWaiter is an interface that enables improved transaction awaiting functionality // RPCEventWaiter is an interface that enables improved transaction awaiting functionality
// for Actor instance based on web-socket Block and ApplicationLog notifications. // for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter
// contains RPCPollingWaiter under the hood and falls back to polling when subscription-based
// awaiting fails.
RPCEventWaiter interface { RPCEventWaiter interface {
RPCPollingWaiter RPCPollingWaiter
@ -50,28 +64,73 @@ type (
} }
) )
// Wait allows to wait until transaction will be accepted to the chain. It can be // NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality.
// used as a wrapper for Send or SignAndSend and accepts transaction hash, type NullWaiter struct{}
// ValidUntilBlock value and an error. It returns transaction execution result
// or an error if transaction wasn't accepted to the chain. // PollingWaiter is a polling-based Waiter.
func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { type PollingWaiter struct {
polling RPCPollingWaiter
version *result.Version
}
// EventWaiter is a websocket-based Waiter.
type EventWaiter struct {
ws RPCEventWaiter
polling Waiter
}
// newWaiter creates Waiter instance. It can be either websocket-based or
// polling-base, otherwise Waiter stub is returned.
func newWaiter(ra RPCActor, v *result.Version) Waiter {
if eventW, ok := ra.(RPCEventWaiter); ok {
return &EventWaiter{
ws: eventW,
polling: &PollingWaiter{
polling: eventW,
version: v,
},
}
}
if pollW, ok := ra.(RPCPollingWaiter); ok {
return &PollingWaiter{
polling: pollW,
version: v,
}
}
return NewNullWaiter()
}
// NewNullWaiter creates an instance of Waiter stub.
func NewNullWaiter() NullWaiter {
return NullWaiter{}
}
// Wait implements Waiter interface.
func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
return nil, ErrAwaitingNotSupported
}
// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) {
v, err := waiter.GetVersion()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if wsW, ok := a.client.(RPCEventWaiter); ok { return &PollingWaiter{
return a.waitWithWSWaiter(wsW, h, vub) polling: waiter,
} version: v,
return a.waitWithSimpleWaiter(a.client, h, vub) }, nil
} }
// waitWithSimpleWaiter waits until transaction is accepted to the chain and // Wait implements Waiter interface.
// returns its execution result or an error if it's missing from chain after func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) {
// VUB block. if err != nil {
func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) { return nil, err
}
var ( var (
currentHeight uint32 currentHeight uint32
failedAttempt int failedAttempt int
pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2 pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2
) )
if pollTime == 0 { if pollTime == 0 {
pollTime = time.Second pollTime = time.Second
@ -81,7 +140,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin
for { for {
select { select {
case <-timer.C: case <-timer.C:
blockCount, err := c.GetBlockCount() blockCount, err := w.polling.GetBlockCount()
if err != nil { if err != nil {
failedAttempt++ failedAttempt++
if failedAttempt > PollingWaiterRetryCount { if failedAttempt > PollingWaiterRetryCount {
@ -94,7 +153,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin
currentHeight = blockCount - 1 currentHeight = blockCount - 1
} }
t := trigger.Application t := trigger.Application
res, err := c.GetApplicationLog(h, &t) res, err := w.polling.GetApplicationLog(h, &t)
if err == nil { if err == nil {
return &state.AppExecResult{ return &state.AppExecResult{
Container: h, Container: h,
@ -104,21 +163,35 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin
if currentHeight >= vub { if currentHeight >= vub {
return nil, ErrTxNotAccepted return nil, ErrTxNotAccepted
} }
case <-w.polling.Context().Done():
case <-c.Context().Done(): return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err())
return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err())
} }
} }
} }
// waitWithWSWaiter waits until transaction is accepted to the chain and returns // NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting.
// its execution result or an error if it's missing from chain after VUB block. // EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based
// It uses optimized web-socket waiter if possible. // awaiting fails.
func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) { func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) {
polling, err := NewPollingWaiter(waiter)
if err != nil {
return nil, err
}
return &EventWaiter{
ws: waiter,
polling: polling,
}, nil
}
// Wait implements Waiter interface.
func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) {
if err != nil {
return nil, err
}
var wsWaitErr error var wsWaitErr error
defer func() { defer func() {
if wsWaitErr != nil { if wsWaitErr != nil {
res, waitErr = a.waitWithSimpleWaiter(c, h, vub) res, waitErr = w.polling.Wait(h, vub, nil)
if waitErr != nil { if waitErr != nil {
waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr)
} }
@ -139,13 +212,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
}() }()
// Execution event follows the block event, thus wait until the block next to the VUB to be sure. // Execution event follows the block event, thus wait until the block next to the VUB to be sure.
since := vub + 1 since := vub + 1
blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr)
if err != nil { if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return return
} }
defer func() { defer func() {
err = c.Unsubscribe(blocksID) err = w.ws.Unsubscribe(blocksID)
if err != nil { if err != nil {
errFmt := "failed to unsubscribe from blocks (id: %s): %v" errFmt := "failed to unsubscribe from blocks (id: %s): %v"
errArgs := []interface{}{blocksID, err} errArgs := []interface{}{blocksID, err}
@ -156,13 +229,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
waitErr = fmt.Errorf(errFmt, errArgs...) waitErr = fmt.Errorf(errFmt, errArgs...)
} }
}() }()
txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr)
if err != nil { if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return return
} }
defer func() { defer func() {
err = c.Unsubscribe(txsID) err = w.ws.Unsubscribe(txsID)
if err != nil { if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v" errFmt := "failed to unsubscribe from transactions (id: %s): %v"
errArgs := []interface{}{txsID, err} errArgs := []interface{}{txsID, err}
@ -189,8 +262,8 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (
wsWaitErr = errors.New("some event was missed") wsWaitErr = errors.New("some event was missed")
return return
} }
case <-c.Context().Done(): case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
return return
} }
} }