rpc: extend Waiter interface to wait for several txs with context

This commit is contained in:
Anna Shaleva 2022-10-21 11:35:10 +03:00
parent 6b216050f3
commit d2a9e9120d

View file

@ -40,6 +40,13 @@ type (
// ValidUntilBlock value and an error. It returns transaction execution result // ValidUntilBlock value and an error. It returns transaction execution result
// or an error if transaction wasn't accepted to the chain. // or an error if transaction wasn't accepted to the chain.
Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error)
// WaitAny waits until at least one of the specified transactions will be accepted
// to the chain until vub (including). It returns execution result of this
// transaction or an error if none of the transactions was accepted to the chain.
// It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt
// awaiting process, but additional ctx can be passed as an argument for the same
// purpose.
WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error)
} }
// RPCPollingWaiter is an interface that enables transaction awaiting functionality // RPCPollingWaiter is an interface that enables transaction awaiting functionality
// for Actor instance based on periodical BlockCount and ApplicationLog polls. // for Actor instance based on periodical BlockCount and ApplicationLog polls.
@ -110,6 +117,11 @@ func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecRes
return nil, ErrAwaitingNotSupported return nil, ErrAwaitingNotSupported
} }
// WaitAny implements Waiter interface.
func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
return nil, ErrAwaitingNotSupported
}
// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. // NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting.
func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) {
v, err := waiter.GetVersion() v, err := waiter.GetVersion()
@ -127,6 +139,11 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE
if err != nil { if err != nil {
return nil, err return nil, err
} }
return w.WaitAny(context.TODO(), vub, h)
}
// WaitAny implements Waiter interface.
func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) {
var ( var (
currentHeight uint32 currentHeight uint32
failedAttempt int failedAttempt int
@ -153,18 +170,22 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE
currentHeight = blockCount - 1 currentHeight = blockCount - 1
} }
t := trigger.Application t := trigger.Application
res, err := w.polling.GetApplicationLog(h, &t) for _, h := range hashes {
if err == nil { res, err := w.polling.GetApplicationLog(h, &t)
return &state.AppExecResult{ if err == nil {
Container: h, return &state.AppExecResult{
Execution: res.Executions[0], Container: res.Container,
}, nil Execution: res.Executions[0],
}, nil
}
} }
if currentHeight >= vub { if currentHeight >= vub {
return nil, ErrTxNotAccepted return nil, ErrTxNotAccepted
} }
case <-w.polling.Context().Done(): case <-w.polling.Context().Done():
return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err()) return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err())
case <-ctx.Done():
return nil, fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
} }
} }
} }
@ -188,10 +209,15 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
if err != nil { if err != nil {
return nil, err return nil, err
} }
return w.WaitAny(context.TODO(), vub, h)
}
// WaitAny implements Waiter interface.
func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
var wsWaitErr error var wsWaitErr error
defer func() { defer func() {
if wsWaitErr != nil { if wsWaitErr != nil {
res, waitErr = w.polling.Wait(h, vub, nil) res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil { if waitErr != nil {
waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr)
} }
@ -229,23 +255,25 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
waitErr = fmt.Errorf(errFmt, errArgs...) waitErr = fmt.Errorf(errFmt, errArgs...)
} }
}() }()
txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) for _, h := range hashes {
if err != nil { txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr)
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
}
defer func() {
err = w.ws.Unsubscribe(txsID)
if err != nil { if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v" wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
errArgs := []interface{}{txsID, err} return
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
} }
}() defer func() {
err = w.ws.Unsubscribe(txsID)
if err != nil {
errFmt := "failed to unsubscribe from transactions (id: %s): %v"
errArgs := []interface{}{txsID, err}
if waitErr != nil {
errFmt += "; wait error: %w"
errArgs = append(errArgs, waitErr)
}
waitErr = fmt.Errorf(errFmt, errArgs...)
}
}()
}
for { for {
select { select {
@ -265,6 +293,9 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
case <-w.ws.Context().Done(): case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
return return
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
return
} }
} }
} }