diff --git a/ROADMAP.md b/ROADMAP.md index 0db81766f..28e673755 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -58,3 +58,14 @@ NeoGo retains certain deprecated error codes: `neorpc.ErrCompatGeneric`, neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions). Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard. + +## Block based web-socket waiter transaction awaiting + +Web-socket RPC based `waiter.EventWaiter` uses `header_of_added_block` notifications +subscription to manage transaction awaiting. To support old NeoGo RPC servers +(older than 0.105.0) that do not have block headers subscription ability, +event-based waiter fallbacks to the old way of block monitoring with +`block_added` notifications subscription. + +Removal of stale RPC server compatibility code from `waiter.EventWaiter` is +scheduled for May-June 2024 (~0.107.0 release). \ No newline at end of file diff --git a/pkg/rpcclient/waiter/waiter.go b/pkg/rpcclient/waiter/waiter.go index 4fa9bf34b..716187fc6 100644 --- a/pkg/rpcclient/waiter/waiter.go +++ b/pkg/rpcclient/waiter/waiter.go @@ -73,6 +73,7 @@ type ( RPCEventBased interface { RPCPollingBased + ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) Unsubscribe(id string) error @@ -234,6 +235,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin var ( wsWaitErr error waitersActive int + hRcvr = make(chan *block.Header, 2) bRcvr = make(chan *block.Block, 2) aerRcvr = make(chan *state.AppExecResult, len(hashes)) unsubErrs = make(chan error) @@ -242,16 +244,22 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin // Execution event preceded the block event, thus wait until the VUB-th block to be sure. since := vub - blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) + blocksID, err := w.ws.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, hRcvr) if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) + // Falling back to block-based subscription. + if errors.Is(err, neorpc.ErrInvalidParams) { + blocksID, err = w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) + } + } + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for new blocks/headers: %w", err) } else { waitersActive++ go func() { <-exit err = w.ws.Unsubscribe(blocksID) if err != nil { - unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err) + unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks/headers (id: %s): %w", blocksID, err) return } unsubErrs <- nil @@ -290,9 +298,20 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin if wsWaitErr == nil && res == nil { select { + case _, ok := <-hRcvr: + if !ok { + // We're toast, retry with non-ws client. + hRcvr = nil + bRcvr = nil + aerRcvr = nil + wsWaitErr = ErrMissedEvent + break + } + waitErr = ErrTxNotAccepted case _, ok := <-bRcvr: if !ok { // We're toast, retry with non-ws client. + hRcvr = nil bRcvr = nil aerRcvr = nil wsWaitErr = ErrMissedEvent @@ -302,6 +321,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin case aer, ok := <-aerRcvr: if !ok { // We're toast, retry with non-ws client. + hRcvr = nil bRcvr = nil aerRcvr = nil wsWaitErr = ErrMissedEvent @@ -321,13 +341,21 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin drainLoop: for { select { + case _, ok := <-hRcvr: + if !ok { // Missed event means both channels are closed. + hRcvr = nil + bRcvr = nil + aerRcvr = nil + } case _, ok := <-bRcvr: if !ok { // Missed event means both channels are closed. + hRcvr = nil bRcvr = nil aerRcvr = nil } case _, ok := <-aerRcvr: if !ok { // Missed event means both channels are closed. + hRcvr = nil bRcvr = nil aerRcvr = nil } @@ -349,6 +377,9 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin } } } + if hRcvr != nil { + close(hRcvr) + } if bRcvr != nil { close(bRcvr) } diff --git a/pkg/rpcclient/waiter/waiter_test.go b/pkg/rpcclient/waiter/waiter_test.go index cd8171add..62df46e07 100644 --- a/pkg/rpcclient/waiter/waiter_test.go +++ b/pkg/rpcclient/waiter/waiter_test.go @@ -35,6 +35,8 @@ type RPCClient struct { context context.Context } +var _ = waiter.RPCPollingBased(&RPCClient{}) + func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { return r.invRes, r.err } @@ -80,11 +82,14 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r type AwaitableRPCClient struct { RPCClient - chLock sync.RWMutex - subBlockCh chan<- *block.Block - subTxCh chan<- *state.AppExecResult + chLock sync.RWMutex + subHeaderCh chan<- *block.Header + subBlockCh chan<- *block.Block + subTxCh chan<- *state.AppExecResult } +var _ = waiter.RPCEventBased(&AwaitableRPCClient{}) + func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() @@ -97,6 +102,12 @@ func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr c.subTxCh = rcvr return "2", nil } +func (c *AwaitableRPCClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subHeaderCh = rcvr + return "3", nil +} func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } func TestNewWaiter(t *testing.T) { @@ -244,7 +255,7 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- &block.Block{} + c.subHeaderCh <- &block.Header{} }) } diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 8cdc51689..1795f8d3d 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -1789,53 +1789,84 @@ func TestClient_Wait(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) - require.NoError(t, err) - acc, err := wallet.NewAccount() - require.NoError(t, err) - act, err := actor.New(c, []actor.SignerAccount{ - { - Signer: transaction.Signer{ - Account: acc.ScriptHash(), - }, - Account: acc, - }, - }) - require.NoError(t, err) + run := func(t *testing.T, ws bool) { + acc, err := wallet.NewAccount() + require.NoError(t, err) - b, err := chain.GetBlock(chain.GetHeaderHash(1)) - require.NoError(t, err) - require.True(t, len(b.Transactions) > 0) + var act *actor.Actor + if ws { + c, err := rpcclient.NewWS(context.Background(), "ws"+strings.TrimPrefix(httpSrv.URL, "http")+"/ws", rpcclient.WSOptions{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + act, err = actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + } else { + c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + act, err = actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + } - check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { - rcvr := make(chan struct{}) - go func() { - aer, err := act.Wait(h, vub, nil) - if errExpected { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, h, aer.Container) - } - rcvr <- struct{}{} - }() - waitloop: - for { - select { - case <-rcvr: - break waitloop - case <-time.NewTimer(chain.GetConfig().TimePerBlock).C: - t.Fatal("transaction failed to be awaited") + b, err := chain.GetBlock(chain.GetHeaderHash(1)) + require.NoError(t, err) + require.True(t, len(b.Transactions) > 0) + + check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { + rcvr := make(chan struct{}) + go func() { + aer, err := act.Wait(h, vub, nil) + if errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, h, aer.Container) + } + rcvr <- struct{}{} + }() + waitloop: + for { + select { + case <-rcvr: + break waitloop + case <-time.NewTimer(chain.GetConfig().TimePerBlock).C: + t.Fatal("transaction failed to be awaited") + } } } + + // Wait for transaction that has been persisted and VUB block has been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) + // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) + if !ws { + // Wait for transaction that hasn't been persisted and VUB block has been persisted. + // WS client waits for the next block to be accepted to ensure that transaction wasn't + // persisted, and this test doesn't run chain, thus, don't run this test for WS client. + check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) + } } - // Wait for transaction that has been persisted and VUB block has been persisted. - check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) - // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. - check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) - // Wait for transaction that hasn't been persisted and VUB block has been persisted. - check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) + t.Run("client", func(t *testing.T) { + run(t, false) + }) + t.Run("ws client", func(t *testing.T) { + run(t, true) + }) } func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {