diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 7d602aaaa..424c24f55 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -244,6 +244,7 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui }() } if wsWaitErr == nil { + trig := trigger.Application for _, h := range hashes { txsID, err := w.ws.ReceiveExecutions(&neorpc.ExecutionFilter{Container: &h}, aerRcvr) if err != nil { @@ -260,22 +261,28 @@ func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Ui } unsubErrs <- nil }() + // There is a potential race between subscription and acceptance, so + // do a polling check once _after_ the subscription. + appLog, err := w.ws.GetApplicationLog(h, &trig) + if err == nil { + res = &state.AppExecResult{ + Container: appLog.Container, + Execution: appLog.Executions[0], + } + break // We have the result, no need for other subscriptions. + } } } - if wsWaitErr == nil { + if wsWaitErr == nil && res == nil { select { - case b, ok := <-bRcvr: + case _, ok := <-bRcvr: if !ok { // We're toast, retry with non-ws client. wsWaitErr = ErrMissedEvent break } - // We can easily end up in a situation when subscription was performed too late and - // the desired transaction and VUB-th block have already got accepted before the - // subscription happened. Thus, always retry with non-ws client, it will perform - // AER requests and make sure. - wsWaitErr = fmt.Errorf("block #%d was received by EventWaiter", b.Index) + waitErr = ErrTxNotAccepted case aer, ok := <-aerRcvr: if !ok { // We're toast, retry with non-ws client. diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go index a3fcac31f..f6b06fb00 100644 --- a/pkg/rpcclient/actor/waiter_test.go +++ b/pkg/rpcclient/actor/waiter_test.go @@ -133,13 +133,12 @@ func TestWSWaiter_Wait(t *testing.T) { require.ErrorIs(t, err, someErr) // AER is in chain immediately. + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + + // Auxiliary things for asynchronous tests. doneCh := make(chan struct{}) - go func() { - aer, err := w.Wait(h, bCount-1, nil) - require.NoError(t, err) - require.Equal(t, expected, aer) - doneCh <- struct{}{} - }() check := func(t *testing.T, trigger func()) { timer := time.NewTimer(time.Second) var triggerFired bool @@ -159,6 +158,15 @@ func TestWSWaiter_Wait(t *testing.T) { } require.True(t, triggerFired) } + + // AER received after the subscription. + c.RPCClient.appLog = nil + go func() { + aer, err = w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + doneCh <- struct{}{} + }() check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() @@ -166,7 +174,6 @@ func TestWSWaiter_Wait(t *testing.T) { }) // Missing AER after VUB. - c.RPCClient.appLog = nil go func() { _, err = w.Wait(h, bCount-2, nil) require.ErrorIs(t, err, ErrTxNotAccepted) diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 023af25d0..093b752b8 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/base64" "encoding/hex" + "encoding/json" "fmt" "math/big" "net/http" @@ -16,6 +17,7 @@ import ( "time" "github.com/google/uuid" + "github.com/gorilla/websocket" "github.com/nspcc-dev/neo-go/internal/testchain" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core" @@ -2132,11 +2134,42 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { // Firstly, accept the block. blocks := getTestBlocks(t) b1 := blocks[0] - b2 := blocks[1] tx := b1.Transactions[0] require.NoError(t, chain.AddBlock(b1)) - // After that, subscribe for AERs/blocks. + // After that, wait and get the result immediately. + aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) + require.NoError(t, err) + require.Equal(t, tx.Hash(), aer.Container) + require.Equal(t, trigger.Application, aer.Trigger) + require.Equal(t, vmstate.Halt, aer.VMState) +} + +func TestWSClient_WaitWithMissedEvent(t *testing.T) { + chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true) + defer chain.Close() + defer rpcSrv.Shutdown() + + url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" + c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + acc, err := wallet.NewAccount() + require.NoError(t, err) + act, err := actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + + blocks := getTestBlocks(t) + b1 := blocks[0] + tx := b1.Transactions[0] + rcvr := make(chan *state.AppExecResult) go func() { aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) @@ -2150,23 +2183,33 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { require.Eventually(t, func() bool { rpcSrv.subsLock.Lock() defer rpcSrv.subsLock.Unlock() - if len(rpcSrv.subscribers) == 1 { // single client - for s := range rpcSrv.subscribers { - var count int - for _, f := range s.feeds { - if f.event != neorpc.InvalidEventID { - count++ - } - } - return count == 2 // subscription for blocks + AERs - } - } - return false + return len(rpcSrv.subscribers) == 1 }, time.Second, 100*time.Millisecond) - // Accept the next block to trigger event-based waiter loop exit and rollback to - // a poll-based waiter. - require.NoError(t, chain.AddBlock(b2)) + rpcSrv.subsLock.Lock() + // Suppress normal event delivery. + for s := range rpcSrv.subscribers { + s.overflown.Store(true) + } + rpcSrv.subsLock.Unlock() + + // Accept the next block, but subscriber will get no events because it's overflown. + require.NoError(t, chain.AddBlock(b1)) + + overEvent, err := json.Marshal(neorpc.Notification{ + JSONRPC: neorpc.JSONRPCVersion, + Event: neorpc.MissedEventID, + Payload: make([]interface{}, 0), + }) + require.NoError(t, err) + overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent) + require.NoError(t, err) + rpcSrv.subsLock.Lock() + // Deliver overflow message -> triggers subscriber to retry with polling waiter. + for s := range rpcSrv.subscribers { + s.writer <- overflowMsg + } + rpcSrv.subsLock.Unlock() // Wait for the result. waitloop: