diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 0da3964dc..023af25d0 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/rpcclient" @@ -2034,10 +2035,24 @@ func TestWSClient_Wait(t *testing.T) { rcvr <- aer }() go func() { + // Wait until client is properly subscribed. The real node won't behave like this, + // but the real node has the subsequent blocks to be added that will trigger client's + // waitloops to finish anyway (and the test has only single block, thus, use it careful). require.Eventually(t, func() bool { rpcSrv.subsLock.Lock() defer rpcSrv.subsLock.Unlock() - return len(rpcSrv.subscribers) == 1 + if len(rpcSrv.subscribers) == 1 { // single client + for s := range rpcSrv.subscribers { + var count int + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + count++ + } + } + return count == 2 // subscription for blocks + AERs + } + } + return false }, time.Second, 100*time.Millisecond) require.NoError(t, chain.AddBlock(b)) }() @@ -2057,6 +2072,25 @@ func TestWSClient_Wait(t *testing.T) { t.Fatalf("transaction from block %d failed to be awaited: deadline exceeded", b.Index) } } + // Wait for server/client to properly unsubscribe. In real life subsequent awaiter + // requests may be run concurrently, and it's OK, but it's important for the test + // not to run subscription requests in parallel because block addition is bounded to + // the number of subscribers. + require.Eventually(t, func() bool { + rpcSrv.subsLock.Lock() + defer rpcSrv.subsLock.Unlock() + if len(rpcSrv.subscribers) != 1 { + return false + } + for s := range rpcSrv.subscribers { + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + return false + } + } + } + return true + }, time.Second, 100*time.Millisecond) } var faultedChecked bool @@ -2102,7 +2136,7 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { tx := b1.Transactions[0] require.NoError(t, chain.AddBlock(b1)) - // After that, subscribe for AERs/blocks and wait. + // After that, subscribe for AERs/blocks. rcvr := make(chan *state.AppExecResult) go func() { aer, err := act.Wait(tx.Hash(), tx.ValidUntilBlock, nil) @@ -2110,8 +2144,28 @@ func TestWSClient_WaitWithLateSubscription(t *testing.T) { rcvr <- aer }() + // Wait until client is properly subscribed. The real node won't behave like this, + // but the real node has the subsequent blocks to be added that will trigger client's + // waitloops to finish anyway (and the test has only single block, thus, use it careful). + require.Eventually(t, func() bool { + rpcSrv.subsLock.Lock() + defer rpcSrv.subsLock.Unlock() + if len(rpcSrv.subscribers) == 1 { // single client + for s := range rpcSrv.subscribers { + var count int + for _, f := range s.feeds { + if f.event != neorpc.InvalidEventID { + count++ + } + } + return count == 2 // subscription for blocks + AERs + } + } + return false + }, time.Second, 100*time.Millisecond) + // Accept the next block to trigger event-based waiter loop exit and rollback to - // poll-based waiter. + // a poll-based waiter. require.NoError(t, chain.AddBlock(b2)) // Wait for the result.