waiter: adopt headers subscription for WS-based tx awaiting

Try to subscribe for headers firstly, and then if RPC server doesn't
have this ability, fallback to block subscriptions to manage transaction
awaiting.

Close #3260.

Signed-off-by: Anna Shaleva <shaleva.ann@nspcc.ru>
This commit is contained in:
Anna Shaleva 2023-12-29 14:33:46 +03:00
parent 7c9b1d05d2
commit f5b1bd3978
4 changed files with 132 additions and 48 deletions

View file

@ -58,3 +58,14 @@ NeoGo retains certain deprecated error codes: `neorpc.ErrCompatGeneric`,
neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions). neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions).
Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard. Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard.
## Block based web-socket waiter transaction awaiting
Web-socket RPC based `waiter.EventWaiter` uses `header_of_added_block` notifications
subscription to manage transaction awaiting. To support old NeoGo RPC servers
(older than 0.105.0) that do not have block headers subscription ability,
event-based waiter fallbacks to the old way of block monitoring with
`block_added` notifications subscription.
Removal of stale RPC server compatibility code from `waiter.EventWaiter` is
scheduled for May-June 2024 (~0.107.0 release).

View file

@ -73,6 +73,7 @@ type (
RPCEventBased interface { RPCEventBased interface {
RPCPollingBased RPCPollingBased
ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error)
ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error)
ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error)
Unsubscribe(id string) error Unsubscribe(id string) error
@ -234,6 +235,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
var ( var (
wsWaitErr error wsWaitErr error
waitersActive int waitersActive int
hRcvr = make(chan *block.Header, 2)
bRcvr = make(chan *block.Block, 2) bRcvr = make(chan *block.Block, 2)
aerRcvr = make(chan *state.AppExecResult, len(hashes)) aerRcvr = make(chan *state.AppExecResult, len(hashes))
unsubErrs = make(chan error) unsubErrs = make(chan error)
@ -242,16 +244,22 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
// Execution event preceded the block event, thus wait until the VUB-th block to be sure. // Execution event preceded the block event, thus wait until the VUB-th block to be sure.
since := vub since := vub
blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) blocksID, err := w.ws.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, hRcvr)
if err != nil { if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) // Falling back to block-based subscription.
if errors.Is(err, neorpc.ErrInvalidParams) {
blocksID, err = w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr)
}
}
if err != nil {
wsWaitErr = fmt.Errorf("failed to subscribe for new blocks/headers: %w", err)
} else { } else {
waitersActive++ waitersActive++
go func() { go func() {
<-exit <-exit
err = w.ws.Unsubscribe(blocksID) err = w.ws.Unsubscribe(blocksID)
if err != nil { if err != nil {
unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err) unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks/headers (id: %s): %w", blocksID, err)
return return
} }
unsubErrs <- nil unsubErrs <- nil
@ -290,9 +298,20 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
if wsWaitErr == nil && res == nil { if wsWaitErr == nil && res == nil {
select { select {
case _, ok := <-hRcvr:
if !ok {
// We're toast, retry with non-ws client.
hRcvr = nil
bRcvr = nil
aerRcvr = nil
wsWaitErr = ErrMissedEvent
break
}
waitErr = ErrTxNotAccepted
case _, ok := <-bRcvr: case _, ok := <-bRcvr:
if !ok { if !ok {
// We're toast, retry with non-ws client. // We're toast, retry with non-ws client.
hRcvr = nil
bRcvr = nil bRcvr = nil
aerRcvr = nil aerRcvr = nil
wsWaitErr = ErrMissedEvent wsWaitErr = ErrMissedEvent
@ -302,6 +321,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
case aer, ok := <-aerRcvr: case aer, ok := <-aerRcvr:
if !ok { if !ok {
// We're toast, retry with non-ws client. // We're toast, retry with non-ws client.
hRcvr = nil
bRcvr = nil bRcvr = nil
aerRcvr = nil aerRcvr = nil
wsWaitErr = ErrMissedEvent wsWaitErr = ErrMissedEvent
@ -321,13 +341,21 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
drainLoop: drainLoop:
for { for {
select { select {
case _, ok := <-hRcvr:
if !ok { // Missed event means both channels are closed.
hRcvr = nil
bRcvr = nil
aerRcvr = nil
}
case _, ok := <-bRcvr: case _, ok := <-bRcvr:
if !ok { // Missed event means both channels are closed. if !ok { // Missed event means both channels are closed.
hRcvr = nil
bRcvr = nil bRcvr = nil
aerRcvr = nil aerRcvr = nil
} }
case _, ok := <-aerRcvr: case _, ok := <-aerRcvr:
if !ok { // Missed event means both channels are closed. if !ok { // Missed event means both channels are closed.
hRcvr = nil
bRcvr = nil bRcvr = nil
aerRcvr = nil aerRcvr = nil
} }
@ -349,6 +377,9 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin
} }
} }
} }
if hRcvr != nil {
close(hRcvr)
}
if bRcvr != nil { if bRcvr != nil {
close(bRcvr) close(bRcvr)
} }

View file

@ -35,6 +35,8 @@ type RPCClient struct {
context context.Context context context.Context
} }
var _ = waiter.RPCPollingBased(&RPCClient{})
func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) {
return r.invRes, r.err return r.invRes, r.err
} }
@ -80,11 +82,14 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r
type AwaitableRPCClient struct { type AwaitableRPCClient struct {
RPCClient RPCClient
chLock sync.RWMutex chLock sync.RWMutex
subBlockCh chan<- *block.Block subHeaderCh chan<- *block.Header
subTxCh chan<- *state.AppExecResult subBlockCh chan<- *block.Block
subTxCh chan<- *state.AppExecResult
} }
var _ = waiter.RPCEventBased(&AwaitableRPCClient{})
func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
c.chLock.Lock() c.chLock.Lock()
defer c.chLock.Unlock() defer c.chLock.Unlock()
@ -97,6 +102,12 @@ func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr
c.subTxCh = rcvr c.subTxCh = rcvr
return "2", nil return "2", nil
} }
func (c *AwaitableRPCClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
c.chLock.Lock()
defer c.chLock.Unlock()
c.subHeaderCh = rcvr
return "3", nil
}
func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil }
func TestNewWaiter(t *testing.T) { func TestNewWaiter(t *testing.T) {
@ -244,7 +255,7 @@ func TestWSWaiter_Wait(t *testing.T) {
check(t, func() { check(t, func() {
c.chLock.RLock() c.chLock.RLock()
defer c.chLock.RUnlock() defer c.chLock.RUnlock()
c.subBlockCh <- &block.Block{} c.subHeaderCh <- &block.Header{}
}) })
} }

View file

@ -1789,53 +1789,84 @@ func TestClient_Wait(t *testing.T) {
defer chain.Close() defer chain.Close()
defer rpcSrv.Shutdown() defer rpcSrv.Shutdown()
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) run := func(t *testing.T, ws bool) {
require.NoError(t, err) acc, err := wallet.NewAccount()
acc, err := wallet.NewAccount() require.NoError(t, err)
require.NoError(t, err)
act, err := actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
b, err := chain.GetBlock(chain.GetHeaderHash(1)) var act *actor.Actor
require.NoError(t, err) if ws {
require.True(t, len(b.Transactions) > 0) c, err := rpcclient.NewWS(context.Background(), "ws"+strings.TrimPrefix(httpSrv.URL, "http")+"/ws", rpcclient.WSOptions{})
require.NoError(t, err)
require.NoError(t, c.Init())
act, err = actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
} else {
c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{})
require.NoError(t, err)
require.NoError(t, c.Init())
act, err = actor.New(c, []actor.SignerAccount{
{
Signer: transaction.Signer{
Account: acc.ScriptHash(),
},
Account: acc,
},
})
require.NoError(t, err)
}
check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { b, err := chain.GetBlock(chain.GetHeaderHash(1))
rcvr := make(chan struct{}) require.NoError(t, err)
go func() { require.True(t, len(b.Transactions) > 0)
aer, err := act.Wait(h, vub, nil)
if errExpected { check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) {
require.Error(t, err) rcvr := make(chan struct{})
} else { go func() {
require.NoError(t, err) aer, err := act.Wait(h, vub, nil)
require.Equal(t, h, aer.Container) if errExpected {
} require.Error(t, err)
rcvr <- struct{}{} } else {
}() require.NoError(t, err)
waitloop: require.Equal(t, h, aer.Container)
for { }
select { rcvr <- struct{}{}
case <-rcvr: }()
break waitloop waitloop:
case <-time.NewTimer(chain.GetConfig().TimePerBlock).C: for {
t.Fatal("transaction failed to be awaited") select {
case <-rcvr:
break waitloop
case <-time.NewTimer(chain.GetConfig().TimePerBlock).C:
t.Fatal("transaction failed to be awaited")
}
} }
} }
// Wait for transaction that has been persisted and VUB block has been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false)
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted.
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false)
if !ws {
// Wait for transaction that hasn't been persisted and VUB block has been persisted.
// WS client waits for the next block to be accepted to ensure that transaction wasn't
// persisted, and this test doesn't run chain, thus, don't run this test for WS client.
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true)
}
} }
// Wait for transaction that has been persisted and VUB block has been persisted. t.Run("client", func(t *testing.T) {
check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) run(t, false)
// Wait for transaction that has been persisted and VUB block hasn't yet been persisted. })
check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) t.Run("ws client", func(t *testing.T) {
// Wait for transaction that hasn't been persisted and VUB block has been persisted. run(t, true)
check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) })
} }
func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient { func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {