From d2a9e9120d7783c903aa0e0497e90363c2e54a5d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Oct 2022 11:35:10 +0300 Subject: [PATCH] rpc: extend Waiter interface to wait for several txs with context --- pkg/rpcclient/actor/waiter.go | 75 +++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 724f9b734..38685bc90 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -40,6 +40,13 @@ type ( // ValidUntilBlock value and an error. It returns transaction execution result // or an error if transaction wasn't accepted to the chain. Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) + // WaitAny waits until at least one of the specified transactions will be accepted + // to the chain until vub (including). It returns execution result of this + // transaction or an error if none of the transactions was accepted to the chain. + // It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt + // awaiting process, but additional ctx can be passed as an argument for the same + // purpose. + WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) } // RPCPollingWaiter is an interface that enables transaction awaiting functionality // for Actor instance based on periodical BlockCount and ApplicationLog polls. @@ -110,6 +117,11 @@ func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecRes return nil, ErrAwaitingNotSupported } +// WaitAny implements Waiter interface. +func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { + return nil, ErrAwaitingNotSupported +} + // NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { v, err := waiter.GetVersion() @@ -127,6 +139,11 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE if err != nil { return nil, err } + return w.WaitAny(context.TODO(), vub, h) +} + +// WaitAny implements Waiter interface. +func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { var ( currentHeight uint32 failedAttempt int @@ -153,18 +170,22 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE currentHeight = blockCount - 1 } t := trigger.Application - res, err := w.polling.GetApplicationLog(h, &t) - if err == nil { - return &state.AppExecResult{ - Container: h, - Execution: res.Executions[0], - }, nil + for _, h := range hashes { + res, err := w.polling.GetApplicationLog(h, &t) + if err == nil { + return &state.AppExecResult{ + Container: res.Container, + Execution: res.Executions[0], + }, nil + } } if currentHeight >= vub { return nil, ErrTxNotAccepted } case <-w.polling.Context().Done(): return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err()) + case <-ctx.Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) } } } @@ -188,10 +209,15 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap if err != nil { return nil, err } + return w.WaitAny(context.TODO(), vub, h) +} + +// WaitAny implements Waiter interface. +func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) { var wsWaitErr error defer func() { if wsWaitErr != nil { - res, waitErr = w.polling.Wait(h, vub, nil) + res, waitErr = w.polling.WaitAny(ctx, vub, hashes...) if waitErr != nil { waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) } @@ -229,23 +255,25 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) - if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) - return - } - defer func() { - err = w.ws.Unsubscribe(txsID) + for _, h := range hashes { + txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) if err != nil { - errFmt := "failed to unsubscribe from transactions (id: %s): %v" - errArgs := []interface{}{txsID, err} - if waitErr != nil { - errFmt += "; wait error: %w" - errArgs = append(errArgs, waitErr) - } - waitErr = fmt.Errorf(errFmt, errArgs...) + wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) + return } - }() + defer func() { + err = w.ws.Unsubscribe(txsID) + if err != nil { + errFmt := "failed to unsubscribe from transactions (id: %s): %v" + errArgs := []interface{}{txsID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + } for { select { @@ -265,6 +293,9 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap case <-w.ws.Context().Done(): waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) return + case <-ctx.Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) + return } } }