rpcclient: refactor event-based waiting loop

Avoid receiver channels locks.
This commit is contained in:
Anna Shaleva 2022-11-16 23:01:01 +03:00
parent 95e23c8e46
commit 1399496dfb

View file

@ -219,25 +219,80 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap
func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) {
var (
wsWaitErr error
bRcvr = make(chan *block.Block)
aerRcvr = make(chan *state.AppExecResult)
unsubErrs = make(chan error)
waitersActive int
bRcvr = make(chan *block.Block, 2)
aerRcvr = make(chan *state.AppExecResult, len(hashes))
unsubErrs = make(chan error)
exit = make(chan struct{})
)
// Rollback to a poll-based waiter if needed.
defer func() {
if wsWaitErr != nil {
res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil {
// Wrap the poll-based error, it's more important.
waitErr = fmt.Errorf("event-based error: %v; poll-based waiter error: %w", wsWaitErr, waitErr)
}
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
since := vub
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
} else {
waitersActive++
go func() {
<-exit
err = w.ws.Unsubscribe(blocksID)
if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err)
return
}
unsubErrs <- nil
}()
}
if wsWaitErr == nil {
for _, h := range hashes {
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
break
}
waitersActive++
go func() {
<-exit
err = w.ws.Unsubscribe(txsID)
if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from transactions (id: %s): %w", txsID, err)
return
}
unsubErrs <- nil
}()
}
}
if wsWaitErr == nil {
select {
case b, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
break
}
// We can easily end up in a situation when subscription was performed too late and
// the desired transaction and VUB-th block have already got accepted before the
// subscription happened. Thus, always retry with non-ws client, it will perform
// AER requests and make sure.
wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index)
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
break
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
}
close(exit)
if waitersActive > 0 {
// Drain receivers to avoid other notification receivers blocking.
defer func() {
drainLoop:
for {
select {
@ -260,72 +315,20 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui
}
}
}
}
if wsWaitErr == nil || !errors.Is(wsWaitErr, ErrMissedEvent) {
close(bRcvr)
close(aerRcvr)
}
}()
close(unsubErrs)
// Execution event preceded the block event, thus wait until the VUB-th block to be sure.
since := vub
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err)
return
// Rollback to a poll-based waiter if needed.
if wsWaitErr != nil && waitErr == nil {
res, waitErr = w.polling.WaitAny(ctx, vub, hashes...)
if waitErr != nil {
// Wrap the poll-based error, it's more important.
waitErr = fmt.Errorf("event-based error: %v; poll-based waiter error: %w", wsWaitErr, waitErr)
}
waitersActive++
defer func() {
go func() {
err = w.ws.Unsubscribe(blocksID)
if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err)
return
}
unsubErrs <- nil
}()
}()
for _, h := range hashes {
txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr)
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err)
return
}
waitersActive++
defer func() {
go func() {
err = w.ws.Unsubscribe(txsID)
if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from transactions (id: %s): %w", txsID, err)
return
}
unsubErrs <- nil
}()
}()
}
select {
case b, ok := <-bRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
// We can easily end up in a situation when subscription was performed too late and
// the desired transaction and VUB-th block have already got accepted before the
// subscription happened. Thus, always retry with non-ws client, it will perform
// AER requests and make sure.
wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index)
case aer, ok := <-aerRcvr:
if !ok {
// We're toast, retry with non-ws client.
wsWaitErr = ErrMissedEvent
return
}
res = aer
case <-w.ws.Context().Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err())
case <-ctx.Done():
waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err())
}
return
}