From 1399496dfb0af2530f85ac8ed892ba78b6eb4768 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 16 Nov 2022 23:01:01 +0300 Subject: [PATCH] rpcclient: refactor event-based waiting loop Avoid receiver channels locks. --- pkg/rpcclient/actor/waiter.go | 153 +++++++++++++++++----------------- 1 file changed, 78 insertions(+), 75 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 003b7546e..7d602aaaa 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -219,25 +219,80 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) { var ( wsWaitErr error - bRcvr = make(chan *block.Block) - aerRcvr = make(chan *state.AppExecResult) - unsubErrs = make(chan error) waitersActive int + bRcvr = make(chan *block.Block, 2) + aerRcvr = make(chan *state.AppExecResult, len(hashes)) + unsubErrs = make(chan error) + exit = make(chan struct{}) ) - // Rollback to a poll-based waiter if needed. - defer func() { - if wsWaitErr != nil { - res, waitErr = w.polling.WaitAny(ctx, vub, hashes...) - if waitErr != nil { - // Wrap the poll-based error, it's more important. - waitErr = fmt.Errorf("event-based error: %v; poll-based waiter error: %w", wsWaitErr, waitErr) + // Execution event preceded the block event, thus wait until the VUB-th block to be sure. + since := vub + blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) + } else { + waitersActive++ + go func() { + <-exit + err = w.ws.Unsubscribe(blocksID) + if err != nil { + unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err) + return } + unsubErrs <- nil + }() + } + if wsWaitErr == nil { + for _, h := range hashes { + txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) + break + } + waitersActive++ + go func() { + <-exit + err = w.ws.Unsubscribe(txsID) + if err != nil { + unsubErrs <- fmt.Errorf("failed to unsubscribe from transactions (id: %s): %w", txsID, err) + return + } + unsubErrs <- nil + }() } - }() + } - // Drain receivers to avoid other notification receivers blocking. - defer func() { + if wsWaitErr == nil { + select { + case b, ok := <-bRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent + break + } + // We can easily end up in a situation when subscription was performed too late and + // the desired transaction and VUB-th block have already got accepted before the + // subscription happened. Thus, always retry with non-ws client, it will perform + // AER requests and make sure. + wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index) + case aer, ok := <-aerRcvr: + if !ok { + // We're toast, retry with non-ws client. + wsWaitErr = ErrMissedEvent + break + } + res = aer + case <-w.ws.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) + case <-ctx.Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) + } + } + close(exit) + + if waitersActive > 0 { + // Drain receivers to avoid other notification receivers blocking. drainLoop: for { select { @@ -260,72 +315,20 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } } } - if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) { - close(bRcvr) - close(aerRcvr) - } - }() - - // Execution event preceded the block event, thus wait until the VUB-th block to be sure. - since := vub - blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) - if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) - return } - waitersActive++ - defer func() { - go func() { - err = w.ws.Unsubscribe(blocksID) - if err != nil { - unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err) - return - } - unsubErrs <- nil - }() - }() - for _, h := range hashes { - txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) - if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) - return - } - waitersActive++ - defer func() { - go func() { - err = w.ws.Unsubscribe(txsID) - if err != nil { - unsubErrs <- fmt.Errorf("failed to unsubscribe from transactions (id: %s): %w", txsID, err) - return - } - unsubErrs <- nil - }() - }() + if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) { + close(bRcvr) + close(aerRcvr) } + close(unsubErrs) - select { - case b, ok := <-bRcvr: - if !ok { - // We're toast, retry with non-ws client. - wsWaitErr = ErrMissedEvent - return + // Rollback to a poll-based waiter if needed. + if wsWaitErr != nil && waitErr == nil { + res, waitErr = w.polling.WaitAny(ctx, vub, hashes...) + if waitErr != nil { + // Wrap the poll-based error, it's more important. + waitErr = fmt.Errorf("event-based error: %v; poll-based waiter error: %w", wsWaitErr, waitErr) } - // We can easily end up in a situation when subscription was performed too late and - // the desired transaction and VUB-th block have already got accepted before the - // subscription happened. Thus, always retry with non-ws client, it will perform - // AER requests and make sure. - wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index) - case aer, ok := <-aerRcvr: - if !ok { - // We're toast, retry with non-ws client. - wsWaitErr = ErrMissedEvent - return - } - res = aer - case <-w.ws.Context().Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) - case <-ctx.Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) } return }