From 7c9b1d05d222b08808348fa4ec027b326ba15bd2 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 29 Dec 2023 15:15:18 +0300 Subject: [PATCH 1/2] rpcclient: refactor waiter package naming Adjust names of all used structures, no need to duplicate `Waiter` everywhere, we already in the `waiter` package. Also, adjust comments to Actor so that links to Waiter are properly described in docs. Ref. #3265. Signed-off-by: Anna Shaleva --- pkg/rpcclient/actor/actor.go | 16 ++--- pkg/rpcclient/notary/actor_test.go | 2 +- pkg/rpcclient/waiter/waiter.go | 96 ++++++++++++++--------------- pkg/rpcclient/waiter/waiter_test.go | 16 ++--- 4 files changed, 65 insertions(+), 65 deletions(-) diff --git a/pkg/rpcclient/actor/actor.go b/pkg/rpcclient/actor/actor.go index e81a581da..2884c3cd2 100644 --- a/pkg/rpcclient/actor/actor.go +++ b/pkg/rpcclient/actor/actor.go @@ -59,18 +59,18 @@ type SignerAccount struct { // transactions in various ways, while "Send" prefix is used by methods that // directly transmit created transactions to the RPC server. // -// Actor also provides a Waiter interface to wait until transaction will be +// Actor also provides a [waiter.Waiter] interface to wait until transaction will be // accepted to the chain. Depending on the underlying RPCActor functionality, // transaction awaiting can be performed via web-socket using RPC notifications -// subsystem with EventWaiter, via regular RPC requests using a poll-based -// algorithm with PollingWaiter or can not be performed if RPCActor doesn't -// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with -// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the -// transaction in the latter case. Waiter uses context of the underlying RPCActor +// subsystem with [waiter.EventBased], via regular RPC requests using a poll-based +// algorithm with [waiter.PollingBased] or can not be performed if RPCActor doesn't +// implement none of [waiter.RPCEventBased] and [waiter.RPCPollingBased] interfaces with +// [waiter.Null]. [waiter.ErrAwaitingNotSupported] will be returned on attempt to await the +// transaction in the latter case. [waiter.Waiter] uses context of the underlying RPCActor // and interrupts transaction awaiting process if the context is done. -// ErrContextDone wrapped with the context's error will be returned in this case. +// [waiter.ErrContextDone] wrapped with the context's error will be returned in this case. // Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance -// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment. +// and [waiter.ErrTxNotAccepted] is returned if transaction wasn't accepted by this moment. type Actor struct { invoker.Invoker waiter.Waiter diff --git a/pkg/rpcclient/notary/actor_test.go b/pkg/rpcclient/notary/actor_test.go index ccda87c5b..57932e7e0 100644 --- a/pkg/rpcclient/notary/actor_test.go +++ b/pkg/rpcclient/notary/actor_test.go @@ -79,7 +79,7 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r return r.applog, nil } -var _ = waiter.RPCPollingWaiter(&RPCClient{}) +var _ = waiter.RPCPollingBased(&RPCClient{}) func TestNewActor(t *testing.T) { rc := &RPCClient{ diff --git a/pkg/rpcclient/waiter/waiter.go b/pkg/rpcclient/waiter/waiter.go index dd17e2a19..4fa9bf34b 100644 --- a/pkg/rpcclient/waiter/waiter.go +++ b/pkg/rpcclient/waiter/waiter.go @@ -15,11 +15,11 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" ) -// PollingWaiterRetryCount is a threshold for a number of subsequent failed -// attempts to get block count from the RPC server for PollingWaiter. If it fails -// to retrieve block count PollingWaiterRetryCount times in a raw then transaction +// PollingBasedRetryCount is a threshold for a number of subsequent failed +// attempts to get block count from the RPC server for PollingBased. If it fails +// to retrieve block count PollingBasedRetryCount times in a raw then transaction // awaiting attempt considered to be failed and an error is returned. -const PollingWaiterRetryCount = 3 +const PollingBasedRetryCount = 3 var ( // ErrTxNotAccepted is returned when transaction wasn't accepted to the chain @@ -31,13 +31,13 @@ var ( // ErrAwaitingNotSupported is returned from Wait method if Waiter instance // doesn't support transaction awaiting. ErrAwaitingNotSupported = errors.New("awaiting not supported") - // ErrMissedEvent is returned when RPCEventWaiter closes receiver channel + // ErrMissedEvent is returned when RPCEventBased closes receiver channel // which happens if missed event was received from the RPC server. ErrMissedEvent = errors.New("some event was missed") ) type ( - // Waiter is an interface providing transaction awaiting functionality to Actor. + // Waiter is an interface providing transaction awaiting functionality. Waiter interface { // Wait allows to wait until transaction will be accepted to the chain. It can be // used as a wrapper for Send or SignAndSend and accepts transaction hash, @@ -51,14 +51,14 @@ type ( // WaitAny waits until at least one of the specified transactions will be accepted // to the chain until vub (including). It returns execution result of this // transaction or an error if none of the transactions was accepted to the chain. - // It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt + // It uses underlying RPCPollingBased or RPCEventBased context to interrupt // awaiting process, but additional ctx can be passed as an argument for the same // purpose. WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) } - // RPCPollingWaiter is an interface that enables transaction awaiting functionality - // for Actor instance based on periodical BlockCount and ApplicationLog polls. - RPCPollingWaiter interface { + // RPCPollingBased is an interface that enables transaction awaiting functionality + // based on periodical BlockCount and ApplicationLog polls. + RPCPollingBased interface { // Context should return the RPC client context to be able to gracefully // shut down all running processes (if so). Context() context.Context @@ -66,12 +66,12 @@ type ( GetBlockCount() (uint32, error) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) } - // RPCEventWaiter is an interface that enables improved transaction awaiting functionality - // for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter - // contains RPCPollingWaiter under the hood and falls back to polling when subscription-based + // RPCEventBased is an interface that enables improved transaction awaiting functionality + // based on web-socket Block and ApplicationLog notifications. RPCEventBased + // contains RPCPollingBased under the hood and falls back to polling when subscription-based // awaiting fails. - RPCEventWaiter interface { - RPCPollingWaiter + RPCEventBased interface { + RPCPollingBased ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) @@ -79,18 +79,18 @@ type ( } ) -// NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality. -type NullWaiter struct{} +// Null is a Waiter stub that doesn't support transaction awaiting functionality. +type Null struct{} -// PollingWaiter is a polling-based Waiter. -type PollingWaiter struct { - polling RPCPollingWaiter +// PollingBased is a polling-based Waiter. +type PollingBased struct { + polling RPCPollingBased version *result.Version } -// EventWaiter is a websocket-based Waiter. -type EventWaiter struct { - ws RPCEventWaiter +// EventBased is a websocket-based Waiter. +type EventBased struct { + ws RPCEventBased polling Waiter } @@ -102,57 +102,57 @@ func errIsAlreadyExists(err error) bool { // New creates Waiter instance. It can be either websocket-based or // polling-base, otherwise Waiter stub is returned. As a first argument -// it accepts RPCEventWaiter implementation, RPCPollingWaiter implementation +// it accepts RPCEventBased implementation, RPCPollingBased implementation // or not an implementation of these two interfaces. It returns websocket-based // waiter, polling-based waiter or a stub correspondingly. func New(base any, v *result.Version) Waiter { - if eventW, ok := base.(RPCEventWaiter); ok { - return &EventWaiter{ + if eventW, ok := base.(RPCEventBased); ok { + return &EventBased{ ws: eventW, - polling: &PollingWaiter{ + polling: &PollingBased{ polling: eventW, version: v, }, } } - if pollW, ok := base.(RPCPollingWaiter); ok { - return &PollingWaiter{ + if pollW, ok := base.(RPCPollingBased); ok { + return &PollingBased{ polling: pollW, version: v, } } - return NewNullWaiter() + return NewNull() } -// NewNullWaiter creates an instance of Waiter stub. -func NewNullWaiter() NullWaiter { - return NullWaiter{} +// NewNull creates an instance of Waiter stub. +func NewNull() Null { + return Null{} } // Wait implements Waiter interface. -func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { +func (Null) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { return nil, ErrAwaitingNotSupported } // WaitAny implements Waiter interface. -func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { +func (Null) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { return nil, ErrAwaitingNotSupported } -// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. -func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { +// NewPollingBased creates an instance of Waiter supporting poll-based transaction awaiting. +func NewPollingBased(waiter RPCPollingBased) (*PollingBased, error) { v, err := waiter.GetVersion() if err != nil { return nil, err } - return &PollingWaiter{ + return &PollingBased{ polling: waiter, version: v, }, nil } // Wait implements Waiter interface. -func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { +func (w *PollingBased) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { if err != nil && !errIsAlreadyExists(err) { return nil, err } @@ -160,7 +160,7 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE } // WaitAny implements Waiter interface. -func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { +func (w *PollingBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { var ( currentHeight uint32 failedAttempt int @@ -177,7 +177,7 @@ func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util. blockCount, err := w.polling.GetBlockCount() if err != nil { failedAttempt++ - if failedAttempt > PollingWaiterRetryCount { + if failedAttempt > PollingBasedRetryCount { return nil, fmt.Errorf("failed to retrieve block count: %w", err) } continue @@ -207,22 +207,22 @@ func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util. } } -// NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting. -// EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based +// NewEventBased creates an instance of Waiter supporting websocket event-based transaction awaiting. +// EventBased contains PollingBased under the hood and falls back to polling when subscription-based // awaiting fails. -func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) { - polling, err := NewPollingWaiter(waiter) +func NewEventBased(waiter RPCEventBased) (*EventBased, error) { + polling, err := NewPollingBased(waiter) if err != nil { return nil, err } - return &EventWaiter{ + return &EventBased{ ws: waiter, polling: polling, }, nil } // Wait implements Waiter interface. -func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) { +func (w *EventBased) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) { if err != nil && !errIsAlreadyExists(err) { return nil, err } @@ -230,7 +230,7 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap } // WaitAny implements Waiter interface. -func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) { +func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) { var ( wsWaitErr error waitersActive int diff --git a/pkg/rpcclient/waiter/waiter_test.go b/pkg/rpcclient/waiter/waiter_test.go index 1eaff101a..cd8171add 100644 --- a/pkg/rpcclient/waiter/waiter_test.go +++ b/pkg/rpcclient/waiter/waiter_test.go @@ -101,15 +101,15 @@ func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } func TestNewWaiter(t *testing.T) { w := waiter.New((actor.RPCActor)(nil), nil) - _, ok := w.(waiter.NullWaiter) + _, ok := w.(waiter.Null) require.True(t, ok) w = waiter.New(&RPCClient{}, &result.Version{}) - _, ok = w.(*waiter.PollingWaiter) + _, ok = w.(*waiter.PollingBased) require.True(t, ok) w = waiter.New(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{}) - _, ok = w.(*waiter.EventWaiter) + _, ok = w.(*waiter.EventBased) require.True(t, ok) } @@ -121,7 +121,7 @@ func TestPollingWaiter_Wait(t *testing.T) { c := &RPCClient{appLog: appLog} c.bCount.Store(bCount) w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. - _, ok := w.(*waiter.PollingWaiter) + _, ok := w.(*waiter.PollingBased) require.True(t, ok) // Wait with error. @@ -186,7 +186,7 @@ func TestWSWaiter_Wait(t *testing.T) { c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}} c.bCount.Store(bCount) w := waiter.New(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. - _, ok := w.(*waiter.EventWaiter) + _, ok := w.(*waiter.EventBased) require.True(t, ok) // Wait with error. @@ -249,7 +249,7 @@ func TestWSWaiter_Wait(t *testing.T) { } func TestRPCWaiterRPCClientCompat(t *testing.T) { - _ = waiter.RPCPollingWaiter(&rpcclient.Client{}) - _ = waiter.RPCPollingWaiter(&rpcclient.WSClient{}) - _ = waiter.RPCEventWaiter(&rpcclient.WSClient{}) + _ = waiter.RPCPollingBased(&rpcclient.Client{}) + _ = waiter.RPCPollingBased(&rpcclient.WSClient{}) + _ = waiter.RPCEventBased(&rpcclient.WSClient{}) } From f5b1bd3978df8a66b1eefdad4815ff7f0e6958b8 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 29 Dec 2023 14:33:46 +0300 Subject: [PATCH 2/2] waiter: adopt headers subscription for WS-based tx awaiting Try to subscribe for headers firstly, and then if RPC server doesn't have this ability, fallback to block subscriptions to manage transaction awaiting. Close #3260. Signed-off-by: Anna Shaleva --- ROADMAP.md | 11 +++ pkg/rpcclient/waiter/waiter.go | 37 ++++++++- pkg/rpcclient/waiter/waiter_test.go | 19 ++++- pkg/services/rpcsrv/client_test.go | 113 ++++++++++++++++++---------- 4 files changed, 132 insertions(+), 48 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 0db81766f..28e673755 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -58,3 +58,14 @@ NeoGo retains certain deprecated error codes: `neorpc.ErrCompatGeneric`, neo-project/proposals#156 (NeoGo pre-0.102.0 and all known C# versions). Removal of the deprecated RPC error codes is planned once all nodes adopt the new error standard. + +## Block based web-socket waiter transaction awaiting + +Web-socket RPC based `waiter.EventWaiter` uses `header_of_added_block` notifications +subscription to manage transaction awaiting. To support old NeoGo RPC servers +(older than 0.105.0) that do not have block headers subscription ability, +event-based waiter fallbacks to the old way of block monitoring with +`block_added` notifications subscription. + +Removal of stale RPC server compatibility code from `waiter.EventWaiter` is +scheduled for May-June 2024 (~0.107.0 release). \ No newline at end of file diff --git a/pkg/rpcclient/waiter/waiter.go b/pkg/rpcclient/waiter/waiter.go index 4fa9bf34b..716187fc6 100644 --- a/pkg/rpcclient/waiter/waiter.go +++ b/pkg/rpcclient/waiter/waiter.go @@ -73,6 +73,7 @@ type ( RPCEventBased interface { RPCPollingBased + ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) Unsubscribe(id string) error @@ -234,6 +235,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin var ( wsWaitErr error waitersActive int + hRcvr = make(chan *block.Header, 2) bRcvr = make(chan *block.Block, 2) aerRcvr = make(chan *state.AppExecResult, len(hashes)) unsubErrs = make(chan error) @@ -242,16 +244,22 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin // Execution event preceded the block event, thus wait until the VUB-th block to be sure. since := vub - blocksID, err := w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) + blocksID, err := w.ws.ReceiveHeadersOfAddedBlocks(&neorpc.BlockFilter{Since: &since}, hRcvr) if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) + // Falling back to block-based subscription. + if errors.Is(err, neorpc.ErrInvalidParams) { + blocksID, err = w.ws.ReceiveBlocks(&neorpc.BlockFilter{Since: &since}, bRcvr) + } + } + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for new blocks/headers: %w", err) } else { waitersActive++ go func() { <-exit err = w.ws.Unsubscribe(blocksID) if err != nil { - unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks (id: %s): %w", blocksID, err) + unsubErrs <- fmt.Errorf("failed to unsubscribe from blocks/headers (id: %s): %w", blocksID, err) return } unsubErrs <- nil @@ -290,9 +298,20 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin if wsWaitErr == nil && res == nil { select { + case _, ok := <-hRcvr: + if !ok { + // We're toast, retry with non-ws client. + hRcvr = nil + bRcvr = nil + aerRcvr = nil + wsWaitErr = ErrMissedEvent + break + } + waitErr = ErrTxNotAccepted case _, ok := <-bRcvr: if !ok { // We're toast, retry with non-ws client. + hRcvr = nil bRcvr = nil aerRcvr = nil wsWaitErr = ErrMissedEvent @@ -302,6 +321,7 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin case aer, ok := <-aerRcvr: if !ok { // We're toast, retry with non-ws client. + hRcvr = nil bRcvr = nil aerRcvr = nil wsWaitErr = ErrMissedEvent @@ -321,13 +341,21 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin drainLoop: for { select { + case _, ok := <-hRcvr: + if !ok { // Missed event means both channels are closed. + hRcvr = nil + bRcvr = nil + aerRcvr = nil + } case _, ok := <-bRcvr: if !ok { // Missed event means both channels are closed. + hRcvr = nil bRcvr = nil aerRcvr = nil } case _, ok := <-aerRcvr: if !ok { // Missed event means both channels are closed. + hRcvr = nil bRcvr = nil aerRcvr = nil } @@ -349,6 +377,9 @@ func (w *EventBased) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uin } } } + if hRcvr != nil { + close(hRcvr) + } if bRcvr != nil { close(bRcvr) } diff --git a/pkg/rpcclient/waiter/waiter_test.go b/pkg/rpcclient/waiter/waiter_test.go index cd8171add..62df46e07 100644 --- a/pkg/rpcclient/waiter/waiter_test.go +++ b/pkg/rpcclient/waiter/waiter_test.go @@ -35,6 +35,8 @@ type RPCClient struct { context context.Context } +var _ = waiter.RPCPollingBased(&RPCClient{}) + func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { return r.invRes, r.err } @@ -80,11 +82,14 @@ func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*r type AwaitableRPCClient struct { RPCClient - chLock sync.RWMutex - subBlockCh chan<- *block.Block - subTxCh chan<- *state.AppExecResult + chLock sync.RWMutex + subHeaderCh chan<- *block.Header + subBlockCh chan<- *block.Block + subTxCh chan<- *state.AppExecResult } +var _ = waiter.RPCEventBased(&AwaitableRPCClient{}) + func (c *AwaitableRPCClient) ReceiveBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Block) (string, error) { c.chLock.Lock() defer c.chLock.Unlock() @@ -97,6 +102,12 @@ func (c *AwaitableRPCClient) ReceiveExecutions(flt *neorpc.ExecutionFilter, rcvr c.subTxCh = rcvr return "2", nil } +func (c *AwaitableRPCClient) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subHeaderCh = rcvr + return "3", nil +} func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } func TestNewWaiter(t *testing.T) { @@ -244,7 +255,7 @@ func TestWSWaiter_Wait(t *testing.T) { check(t, func() { c.chLock.RLock() defer c.chLock.RUnlock() - c.subBlockCh <- &block.Block{} + c.subHeaderCh <- &block.Header{} }) } diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 8cdc51689..1795f8d3d 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -1789,53 +1789,84 @@ func TestClient_Wait(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) - require.NoError(t, err) - acc, err := wallet.NewAccount() - require.NoError(t, err) - act, err := actor.New(c, []actor.SignerAccount{ - { - Signer: transaction.Signer{ - Account: acc.ScriptHash(), - }, - Account: acc, - }, - }) - require.NoError(t, err) + run := func(t *testing.T, ws bool) { + acc, err := wallet.NewAccount() + require.NoError(t, err) - b, err := chain.GetBlock(chain.GetHeaderHash(1)) - require.NoError(t, err) - require.True(t, len(b.Transactions) > 0) + var act *actor.Actor + if ws { + c, err := rpcclient.NewWS(context.Background(), "ws"+strings.TrimPrefix(httpSrv.URL, "http")+"/ws", rpcclient.WSOptions{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + act, err = actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + } else { + c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + act, err = actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + } - check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { - rcvr := make(chan struct{}) - go func() { - aer, err := act.Wait(h, vub, nil) - if errExpected { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, h, aer.Container) - } - rcvr <- struct{}{} - }() - waitloop: - for { - select { - case <-rcvr: - break waitloop - case <-time.NewTimer(chain.GetConfig().TimePerBlock).C: - t.Fatal("transaction failed to be awaited") + b, err := chain.GetBlock(chain.GetHeaderHash(1)) + require.NoError(t, err) + require.True(t, len(b.Transactions) > 0) + + check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { + rcvr := make(chan struct{}) + go func() { + aer, err := act.Wait(h, vub, nil) + if errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, h, aer.Container) + } + rcvr <- struct{}{} + }() + waitloop: + for { + select { + case <-rcvr: + break waitloop + case <-time.NewTimer(chain.GetConfig().TimePerBlock).C: + t.Fatal("transaction failed to be awaited") + } } } + + // Wait for transaction that has been persisted and VUB block has been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) + // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) + if !ws { + // Wait for transaction that hasn't been persisted and VUB block has been persisted. + // WS client waits for the next block to be accepted to ensure that transaction wasn't + // persisted, and this test doesn't run chain, thus, don't run this test for WS client. + check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) + } } - // Wait for transaction that has been persisted and VUB block has been persisted. - check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) - // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. - check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) - // Wait for transaction that hasn't been persisted and VUB block has been persisted. - check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) + t.Run("client", func(t *testing.T) { + run(t, false) + }) + t.Run("ws client", func(t *testing.T) { + run(t, true) + }) } func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient {