From 4ce6bc6a668cdef8e326079bf9735fab1a66cac9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 17 Oct 2022 09:12:33 +0300 Subject: [PATCH 01/15] rpc: adjust comment to Notification value cast And explicitly specify field names on Notification creation. --- pkg/rpcclient/wsclient.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 3a71d8bb9..33de3826d 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -52,8 +52,8 @@ type WSClient struct { } // Notification represents a server-generated notification for client subscriptions. -// Value can be one of block.Block, state.AppExecResult, state.ContainedNotificationEvent -// transaction.Transaction or subscriptions.NotaryRequestEvent based on Type. +// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent +// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type. type Notification struct { Type neorpc.EventID Value interface{} @@ -205,7 +205,7 @@ readloop: break readloop } } - c.Notifications <- Notification{event, val} + c.Notifications <- Notification{Type: event, Value: val} } else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) { id, err := strconv.ParseUint(string(rr.ID), 10, 64) if err != nil { From 6d38e7514975865fc26a884f805637fbe3372c16 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 17 Oct 2022 13:31:24 +0300 Subject: [PATCH 02/15] rpc: support multiple WSClient notification receivers --- pkg/neorpc/rpcevent/filter.go | 83 +++++++++ pkg/neorpc/rpcevent/filter_test.go | 252 ++++++++++++++++++++++++++++ pkg/neorpc/types.go | 11 ++ pkg/rpcclient/wsclient.go | 146 ++++++++++++++-- pkg/rpcclient/wsclient_test.go | 129 +++++++++++--- pkg/services/rpcsrv/server.go | 3 +- pkg/services/rpcsrv/subscription.go | 71 ++------ 7 files changed, 599 insertions(+), 96 deletions(-) create mode 100644 pkg/neorpc/rpcevent/filter.go create mode 100644 pkg/neorpc/rpcevent/filter_test.go diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go new file mode 100644 index 000000000..e2ca74c1b --- /dev/null +++ b/pkg/neorpc/rpcevent/filter.go @@ -0,0 +1,83 @@ +package rpcevent + +import ( + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" +) + +type ( + // Comparator is an interface required from notification event filter to be able to + // filter notifications. + Comparator interface { + EventID() neorpc.EventID + Filter() interface{} + } + // Container is an interface required from notification event to be able to + // pass filter. + Container interface { + EventID() neorpc.EventID + EventPayload() interface{} + } +) + +// Matches filters our given Container against Comparator filter. +func Matches(f Comparator, r Container) bool { + expectedEvent := f.EventID() + filter := f.Filter() + if r.EventID() != expectedEvent { + return false + } + if filter == nil { + return true + } + switch f.EventID() { + case neorpc.BlockEventID: + filt := filter.(neorpc.BlockFilter) + b := r.EventPayload().(*block.Block) + return int(b.PrimaryIndex) == filt.Primary + case neorpc.TransactionEventID: + filt := filter.(neorpc.TxFilter) + tx := r.EventPayload().(*transaction.Transaction) + senderOK := filt.Sender == nil || tx.Sender().Equals(*filt.Sender) + signerOK := true + if filt.Signer != nil { + signerOK = false + for i := range tx.Signers { + if tx.Signers[i].Account.Equals(*filt.Signer) { + signerOK = true + break + } + } + } + return senderOK && signerOK + case neorpc.NotificationEventID: + filt := filter.(neorpc.NotificationFilter) + notification := r.EventPayload().(*state.ContainedNotificationEvent) + hashOk := filt.Contract == nil || notification.ScriptHash.Equals(*filt.Contract) + nameOk := filt.Name == nil || notification.Name == *filt.Name + return hashOk && nameOk + case neorpc.ExecutionEventID: + filt := filter.(neorpc.ExecutionFilter) + applog := r.EventPayload().(*state.AppExecResult) + return applog.VMState.String() == filt.State + case neorpc.NotaryRequestEventID: + filt := filter.(neorpc.TxFilter) + req := r.EventPayload().(*result.NotaryRequestEvent) + senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender + signerOK := true + if filt.Signer != nil { + signerOK = false + for _, signer := range req.NotaryRequest.MainTransaction.Signers { + if signer.Account.Equals(*filt.Signer) { + signerOK = true + break + } + } + } + return senderOk && signerOK + } + return false +} diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go new file mode 100644 index 000000000..d16c222e6 --- /dev/null +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -0,0 +1,252 @@ +package rpcevent + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" + "github.com/stretchr/testify/require" +) + +type ( + testComparator struct { + id neorpc.EventID + filter interface{} + } + testContainer struct { + id neorpc.EventID + pld interface{} + } +) + +func (c testComparator) EventID() neorpc.EventID { + return c.id +} +func (c testComparator) Filter() interface{} { + return c.filter +} +func (c testContainer) EventID() neorpc.EventID { + return c.id +} +func (c testContainer) EventPayload() interface{} { + return c.pld +} + +func TestMatches(t *testing.T) { + primary := byte(1) + sender := util.Uint160{1, 2, 3} + signer := util.Uint160{4, 5, 6} + contract := util.Uint160{7, 8, 9} + badUint160 := util.Uint160{9, 9, 9} + name := "ntf name" + badName := "bad name" + bContainer := testContainer{ + id: neorpc.BlockEventID, + pld: &block.Block{ + Header: block.Header{PrimaryIndex: primary}, + }, + } + st := vmstate.Halt + badState := "FAULT" + txContainer := testContainer{ + id: neorpc.TransactionEventID, + pld: &transaction.Transaction{Signers: []transaction.Signer{{Account: sender}, {Account: signer}}}, + } + ntfContainer := testContainer{ + id: neorpc.NotificationEventID, + pld: &state.ContainedNotificationEvent{NotificationEvent: state.NotificationEvent{ScriptHash: contract, Name: name}}, + } + exContainer := testContainer{ + id: neorpc.ExecutionEventID, + pld: &state.AppExecResult{Execution: state.Execution{VMState: st}}, + } + ntrContainer := testContainer{ + id: neorpc.NotaryRequestEventID, + pld: &result.NotaryRequestEvent{ + NotaryRequest: &payload.P2PNotaryRequest{ + MainTransaction: &transaction.Transaction{Signers: []transaction.Signer{{Account: signer}}}, + FallbackTransaction: &transaction.Transaction{Signers: []transaction.Signer{{Account: util.Uint160{}}, {Account: sender}}}, + }, + }, + } + missedContainer := testContainer{ + id: neorpc.MissedEventID, + } + var testCases = []struct { + name string + comparator testComparator + container testContainer + expected bool + }{ + { + name: "ID mismatch", + comparator: testComparator{id: neorpc.TransactionEventID}, + container: bContainer, + expected: false, + }, + { + name: "missed event", + comparator: testComparator{id: neorpc.BlockEventID}, + container: missedContainer, + expected: false, + }, + { + name: "block, no filter", + comparator: testComparator{id: neorpc.BlockEventID}, + container: bContainer, + expected: true, + }, + { + name: "block, primary mismatch", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Primary: int(primary + 1)}, + }, + container: bContainer, + expected: false, + }, + { + name: "block, filter match", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Primary: int(primary)}, + }, + container: bContainer, + expected: true, + }, + { + name: "transaction, no filter", + comparator: testComparator{id: neorpc.TransactionEventID}, + container: txContainer, + expected: true, + }, + { + name: "transaction, sender mismatch", + comparator: testComparator{ + id: neorpc.TransactionEventID, + filter: neorpc.TxFilter{Sender: &badUint160}, + }, + container: txContainer, + expected: false, + }, + { + name: "transaction, signer mismatch", + comparator: testComparator{ + id: neorpc.TransactionEventID, + filter: neorpc.TxFilter{Signer: &badUint160}, + }, + container: txContainer, + expected: false, + }, + { + name: "transaction, filter match", + comparator: testComparator{ + id: neorpc.TransactionEventID, + filter: neorpc.TxFilter{Sender: &sender, Signer: &signer}, + }, + container: txContainer, + expected: true, + }, + { + name: "notification, no filter", + comparator: testComparator{id: neorpc.NotificationEventID}, + container: ntfContainer, + expected: true, + }, + { + name: "notification, contract mismatch", + comparator: testComparator{ + id: neorpc.NotificationEventID, + filter: neorpc.NotificationFilter{Contract: &badUint160}, + }, + container: ntfContainer, + expected: false, + }, + { + name: "notification, name mismatch", + comparator: testComparator{ + id: neorpc.NotificationEventID, + filter: neorpc.NotificationFilter{Name: &badName}, + }, + container: ntfContainer, + expected: false, + }, + { + name: "notification, filter match", + comparator: testComparator{ + id: neorpc.NotificationEventID, + filter: neorpc.NotificationFilter{Name: &name, Contract: &contract}, + }, + container: ntfContainer, + expected: true, + }, + { + name: "execution, no filter", + comparator: testComparator{id: neorpc.ExecutionEventID}, + container: exContainer, + expected: true, + }, + { + name: "execution, state mismatch", + comparator: testComparator{ + id: neorpc.ExecutionEventID, + filter: neorpc.ExecutionFilter{State: badState}, + }, + container: exContainer, + expected: false, + }, + { + name: "execution, filter mismatch", + comparator: testComparator{ + id: neorpc.ExecutionEventID, + filter: neorpc.ExecutionFilter{State: st.String()}, + }, + container: exContainer, + expected: true, + }, + { + name: "notary request, no filter", + comparator: testComparator{id: neorpc.NotaryRequestEventID}, + container: ntrContainer, + expected: true, + }, + { + name: "notary request, sender mismatch", + comparator: testComparator{ + id: neorpc.NotaryRequestEventID, + filter: neorpc.TxFilter{Sender: &badUint160}, + }, + container: ntrContainer, + expected: false, + }, + { + name: "notary request, signer mismatch", + comparator: testComparator{ + id: neorpc.NotaryRequestEventID, + filter: neorpc.TxFilter{Signer: &badUint160}, + }, + container: ntrContainer, + expected: false, + }, + { + name: "notary request, filter match", + comparator: testComparator{ + id: neorpc.NotaryRequestEventID, + filter: neorpc.TxFilter{Sender: &sender, Signer: &signer}, + }, + container: ntrContainer, + expected: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, Matches(tc.comparator, tc.container)) + }) + } +} diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index d78991380..ff01cbfa7 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -155,3 +155,14 @@ func (s *SignerWithWitness) UnmarshalJSON(data []byte) error { } return nil } + +// EventID implements EventContainer interface and returns notification ID. +func (n *Notification) EventID() EventID { + return n.Event +} + +// EventPayload implements EventContainer interface and returns notification +// object. +func (n *Notification) EventPayload() interface{} { + return n.Payload[0] +} diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 33de3826d..1711a1f26 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" ) @@ -45,12 +46,30 @@ type WSClient struct { closeErr error subscriptionsLock sync.RWMutex - subscriptions map[string]bool + subscriptions map[string]notificationReceiver respLock sync.RWMutex respChannels map[uint64]chan *neorpc.Response } +// notificationReceiver is a server events receiver. It stores desired notifications ID +// and filter and a channel used to receive matching notifications. +type notificationReceiver struct { + typ neorpc.EventID + filter interface{} + ch chan<- Notification +} + +// EventID implements neorpc.Comparator interface and returns notification ID. +func (r notificationReceiver) EventID() neorpc.EventID { + return r.typ +} + +// Filter implements neorpc.Comparator interface and returns notification filter. +func (r notificationReceiver) Filter() interface{} { + return r.filter +} + // Notification represents a server-generated notification for client subscriptions. // Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent // *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type. @@ -59,6 +78,17 @@ type Notification struct { Value interface{} } +// EventID implements Container interface and returns notification ID. +func (n Notification) EventID() neorpc.EventID { + return n.Type +} + +// EventPayload implements Container interface and returns notification +// object. +func (n Notification) EventPayload() interface{} { + return n.Value +} + // requestResponse is a combined type for request and response since we can get // any of them here. type requestResponse struct { @@ -107,7 +137,7 @@ func NewWS(ctx context.Context, endpoint string, opts Options) (*WSClient, error closeCalled: *atomic.NewBool(false), respChannels: make(map[uint64]chan *neorpc.Response), requests: make(chan *neorpc.Request), - subscriptions: make(map[string]bool), + subscriptions: make(map[string]notificationReceiver), } err = initClient(ctx, &wsc.Client, endpoint, opts) @@ -205,7 +235,16 @@ readloop: break readloop } } - c.Notifications <- Notification{Type: event, Value: val} + ok := make(map[chan<- Notification]bool) + c.subscriptionsLock.RLock() + for _, rcvr := range c.subscriptions { + ntf := Notification{Type: event, Value: val} + if (rpcevent.Matches(rcvr, ntf) || event == neorpc.MissedEventID /*missed event must be delivered to each receiver*/) && !ok[rcvr.ch] { + ok[rcvr.ch] = true // strictly one notification per channel + rcvr.ch <- ntf // this will block other receivers + } + } + c.subscriptionsLock.RUnlock() } else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) { id, err := strconv.ParseUint(string(rr.ID), 10, 64) if err != nil { @@ -317,7 +356,7 @@ func (c *WSClient) makeWsRequest(r *neorpc.Request) (*neorpc.Response, error) { } } -func (c *WSClient) performSubscription(params []interface{}) (string, error) { +func (c *WSClient) performSubscription(params []interface{}, rcvr notificationReceiver) (string, error) { var resp string if err := c.performRequest("subscribe", params, &resp); err != nil { @@ -327,7 +366,7 @@ func (c *WSClient) performSubscription(params []interface{}) (string, error) { c.subscriptionsLock.Lock() defer c.subscriptionsLock.Unlock() - c.subscriptions[resp] = true + c.subscriptions[resp] = rcvr return resp, nil } @@ -337,7 +376,7 @@ func (c *WSClient) performUnsubscription(id string) error { c.subscriptionsLock.Lock() defer c.subscriptionsLock.Unlock() - if !c.subscriptions[id] { + if _, ok := c.subscriptions[id]; !ok { return errors.New("no subscription with this ID") } if err := c.performRequest("unsubscribe", []interface{}{id}, &resp); err != nil { @@ -354,22 +393,52 @@ func (c *WSClient) performUnsubscription(id string) error { // of the client. It can be filtered by primary consensus node index, nil value doesn't // add any filters. func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { + return c.SubscribeForNewBlocksWithChan(primary, c.Notifications) +} + +// SubscribeForNewBlocksWithChan registers provided channel as a receiver for the +// specified new blocks notifications. The receiver channel must be properly read and +// drained after usage in order not to block other notification receivers. +// See SubscribeForNewBlocks for parameter details. +func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"block_added"} + var flt *neorpc.BlockFilter if primary != nil { - params = append(params, neorpc.BlockFilter{Primary: *primary}) + flt = &neorpc.BlockFilter{Primary: *primary} + params = append(params, *flt) } - return c.performSubscription(params) + rcvr := notificationReceiver{ + typ: neorpc.BlockEventID, + filter: flt, + ch: rcvrCh, + } + return c.performSubscription(params, rcvr) } // SubscribeForNewTransactions adds subscription for new transaction events to // this instance of the client. It can be filtered by the sender and/or the signer, nil // value is treated as missing filter. func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) { + return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications) +} + +// SubscribeForNewTransactionsWithChan registers provided channel as a receiver +// for the specified new transactions notifications. The receiver channel must be +// properly read and drained after usage in order not to block other notification +// receivers. See SubscribeForNewTransactions for parameter details. +func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"transaction_added"} + var flt *neorpc.TxFilter if sender != nil || signer != nil { - params = append(params, neorpc.TxFilter{Sender: sender, Signer: signer}) + flt = &neorpc.TxFilter{Sender: sender, Signer: signer} + params = append(params, *flt) } - return c.performSubscription(params) + rcvr := notificationReceiver{ + typ: neorpc.TransactionEventID, + filter: flt, + ch: rcvrCh, + } + return c.performSubscription(params, rcvr) } // SubscribeForExecutionNotifications adds subscription for notifications @@ -377,11 +446,26 @@ func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *uti // filtered by the contract's hash (that emits notifications), nil value puts no such // restrictions. func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) { + return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications) +} + +// SubscribeForExecutionNotificationsWithChan registers provided channel as a +// receiver for the specified execution events. The receiver channel must be +// properly read and drained after usage in order not to block other notification +// receivers. See SubscribeForExecutionNotifications for parameter details. +func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"notification_from_execution"} + var flt *neorpc.NotificationFilter if contract != nil || name != nil { - params = append(params, neorpc.NotificationFilter{Contract: contract, Name: name}) + flt = &neorpc.NotificationFilter{Contract: contract, Name: name} + params = append(params, *flt) } - return c.performSubscription(params) + rcvr := notificationReceiver{ + typ: neorpc.NotificationEventID, + filter: flt, + ch: rcvrCh, + } + return c.performSubscription(params, rcvr) } // SubscribeForTransactionExecutions adds subscription for application execution @@ -389,14 +473,29 @@ func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, na // be filtered by state (HALT/FAULT) to check for successful or failing // transactions, nil value means no filtering. func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { + return c.SubscribeForTransactionExecutionsWithChan(state, c.Notifications) +} + +// SubscribeForTransactionExecutionsWithChan registers provided channel as a +// receiver for the specified execution notifications. The receiver channel must be +// properly read and drained after usage in order not to block other notification +// receivers. See SubscribeForTransactionExecutions for parameter details. +func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"transaction_executed"} + var flt *neorpc.ExecutionFilter if state != nil { if *state != "HALT" && *state != "FAULT" { return "", errors.New("bad state parameter") } - params = append(params, neorpc.ExecutionFilter{State: *state}) + flt = &neorpc.ExecutionFilter{State: *state} + params = append(params, *flt) } - return c.performSubscription(params) + rcvr := notificationReceiver{ + typ: neorpc.ExecutionEventID, + filter: flt, + ch: rcvrCh, + } + return c.performSubscription(params, rcvr) } // SubscribeForNotaryRequests adds subscription for notary request payloads @@ -404,11 +503,26 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err // request sender's hash, or main tx signer's hash, nil value puts no such // restrictions. func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { + return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications) +} + +// SubscribeForNotaryRequestsWithChan registers provided channel as a receiver +// for the specified notary requests notifications. The receiver channel must be +// properly read and drained after usage in order not to block other notification +// receivers. See SubscribeForNotaryRequests for parameter details. +func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"notary_request_event"} + var flt *neorpc.TxFilter if sender != nil { - params = append(params, neorpc.TxFilter{Sender: sender, Signer: mainSigner}) + flt = &neorpc.TxFilter{Sender: sender, Signer: mainSigner} + params = append(params, *flt) } - return c.performSubscription(params) + rcvr := notificationReceiver{ + typ: neorpc.NotaryRequestEventID, + filter: flt, + ch: rcvrCh, + } + return c.performSubscription(params, rcvr) } // Unsubscribe removes subscription for the given event stream. diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index eb730d2c0..1bc396a21 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -32,19 +32,32 @@ func TestWSClientClose(t *testing.T) { } func TestWSClientSubscription(t *testing.T) { + ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewBlocks(nil) }, + "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForNewBlocksWithChan(nil, ch) + }, "transactions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactions(nil, nil) }, + "transactions_with_custom_ch": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch) + }, "notifications": func(wsc *WSClient) (string, error) { return wsc.SubscribeForExecutionNotifications(nil, nil) }, + "notifications_with_custom_ch": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) + }, "executions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForTransactionExecutions(nil) }, + "executions_with_custom_ch": func(wsc *WSClient) (string, error) { + return wsc.SubscribeForTransactionExecutionsWithChan(nil, ch) + }, } t.Run("good", func(t *testing.T) { for name, f := range cases { @@ -83,13 +96,13 @@ func TestWSClientUnsubscription(t *testing.T) { var cases = map[string]responseCheck{ "good": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = true + wsc.subscriptions["0"] = notificationReceiver{} err := wsc.Unsubscribe("0") require.NoError(t, err) }}, "all": {`{"jsonrpc": "2.0", "id": 1, "result": true}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = true + wsc.subscriptions["0"] = notificationReceiver{} err := wsc.UnsubscribeAll() require.NoError(t, err) require.Equal(t, 0, len(wsc.subscriptions)) @@ -100,13 +113,13 @@ func TestWSClientUnsubscription(t *testing.T) { }}, "error returned": {`{"jsonrpc": "2.0", "id": 1, "error":{"code":-32602,"message":"Invalid Params"}}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = true + wsc.subscriptions["0"] = notificationReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, "false returned": {`{"jsonrpc": "2.0", "id": 1, "result": false}`, func(t *testing.T, wsc *WSClient) { // We can't really subscribe using this stub server, so set up wsc internals. - wsc.subscriptions["0"] = true + wsc.subscriptions["0"] = notificationReceiver{} err := wsc.Unsubscribe("0") require.Error(t, err) }}, @@ -151,26 +164,104 @@ func TestWSClientEvents(t *testing.T) { } })) - wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) - require.NoError(t, err) - wsc.getNextRequestID = getTestRequestID - wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. - wsc.cache.network = netmode.UnitTestNet - for range events { + t.Run("default ntf channel", func(t *testing.T) { + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + wsc.getNextRequestID = getTestRequestID + wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + // Our server mock is restricted, so perform subscriptions manually with default notifications channel. + wsc.subscriptionsLock.Lock() + wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} + wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} + wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} + // MissedEvent must be delivered without subscription. + wsc.subscriptionsLock.Unlock() + wsc.cache.network = netmode.UnitTestNet + for range events { + select { + case _, ok = <-wsc.Notifications: + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + require.True(t, ok) + } select { case _, ok = <-wsc.Notifications: case <-time.After(time.Second): t.Fatal("timeout waiting for event") } - require.True(t, ok) - } - select { - case _, ok = <-wsc.Notifications: - case <-time.After(time.Second): - t.Fatal("timeout waiting for event") - } - // Connection closed by server. - require.False(t, ok) + // Connection closed by server. + require.False(t, ok) + }) + t.Run("multiple ntf channels", func(t *testing.T) { + wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) + require.NoError(t, err) + wsc.getNextRequestID = getTestRequestID + wsc.cacheLock.Lock() + wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + wsc.cache.network = netmode.UnitTestNet + wsc.cacheLock.Unlock() + + // Our server mock is restricted, so perform subscriptions manually with default notifications channel. + ch1 := make(chan Notification) + ch2 := make(chan Notification) + ch3 := make(chan Notification) + wsc.subscriptionsLock.Lock() + wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} + wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} + wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} + wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} + wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} + wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions + wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "HALT"}, ch: ch2} + wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "FAULT"}, ch: ch3} + // MissedEvent must be delivered without subscription. + wsc.subscriptionsLock.Unlock() + + var ( + defaultChCnt int + ch1Cnt int + ch2Cnt int + ch3Cnt int + expectedDefaultCnCount = len(events) + expectedCh1Cnt = 1 + 1 // Block event + Missed event + expectedCh2Cnt = 1 + 2 + 1 // Notification event + 2 Execution events + Missed event + expectedCh3Cnt = 1 // Missed event + ntf Notification + ) + for i := 0; i < expectedDefaultCnCount+expectedCh1Cnt+expectedCh2Cnt+expectedCh3Cnt; i++ { + select { + case ntf, ok = <-wsc.Notifications: + defaultChCnt++ + case ntf, ok = <-ch1: + require.True(t, ntf.Type == neorpc.BlockEventID || ntf.Type == neorpc.MissedEventID, ntf.Type) + ch1Cnt++ + case ntf, ok = <-ch2: + require.True(t, ntf.Type == neorpc.NotificationEventID || ntf.Type == neorpc.MissedEventID || ntf.Type == neorpc.ExecutionEventID) + ch2Cnt++ + case ntf, ok = <-ch3: + require.True(t, ntf.Type == neorpc.MissedEventID) + ch3Cnt++ + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + require.True(t, ok) + } + select { + case _, ok = <-wsc.Notifications: + case _, ok = <-ch1: + case _, ok = <-ch2: + case _, ok = <-ch3: + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + // Connection closed by server. + require.False(t, ok) + require.Equal(t, expectedDefaultCnCount, defaultChCnt) + require.Equal(t, expectedCh1Cnt, ch1Cnt) + require.Equal(t, expectedCh2Cnt, ch2Cnt) + require.Equal(t, expectedCh3Cnt, ch3Cnt) + }) } func TestWSExecutionVMStateCheck(t *testing.T) { diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index fbc19bd90..d5874e92d 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -40,6 +40,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/neorpc/rpcevent" "github.com/nspcc-dev/neo-go/pkg/network" "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/services/oracle/broadcaster" @@ -2593,7 +2594,7 @@ chloop: continue } for i := range sub.feeds { - if sub.feeds[i].Matches(&resp) { + if rpcevent.Matches(sub.feeds[i], &resp) { if msg == nil { b, err = json.Marshal(resp) if err != nil { diff --git a/pkg/services/rpcsrv/subscription.go b/pkg/services/rpcsrv/subscription.go index 6ec1be3fb..85e9a7036 100644 --- a/pkg/services/rpcsrv/subscription.go +++ b/pkg/services/rpcsrv/subscription.go @@ -2,11 +2,7 @@ package rpcsrv import ( "github.com/gorilla/websocket" - "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc" - "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "go.uber.org/atomic" ) @@ -22,12 +18,23 @@ type ( // that's not for long. feeds [maxFeeds]feed } + // feed stores subscriber's desired event ID with filter. feed struct { event neorpc.EventID filter interface{} } ) +// EventID implements neorpc.EventComparator interface and returns notification ID. +func (f feed) EventID() neorpc.EventID { + return f.event +} + +// Filter implements neorpc.EventComparator interface and returns notification filter. +func (f feed) Filter() interface{} { + return f.filter +} + const ( // Maximum number of subscriptions per one client. maxFeeds = 16 @@ -42,59 +49,3 @@ const ( // a lot in terms of memory used. notificationBufSize = 1024 ) - -func (f *feed) Matches(r *neorpc.Notification) bool { - if r.Event != f.event { - return false - } - if f.filter == nil { - return true - } - switch f.event { - case neorpc.BlockEventID: - filt := f.filter.(neorpc.BlockFilter) - b := r.Payload[0].(*block.Block) - return int(b.PrimaryIndex) == filt.Primary - case neorpc.TransactionEventID: - filt := f.filter.(neorpc.TxFilter) - tx := r.Payload[0].(*transaction.Transaction) - senderOK := filt.Sender == nil || tx.Sender().Equals(*filt.Sender) - signerOK := true - if filt.Signer != nil { - signerOK = false - for i := range tx.Signers { - if tx.Signers[i].Account.Equals(*filt.Signer) { - signerOK = true - break - } - } - } - return senderOK && signerOK - case neorpc.NotificationEventID: - filt := f.filter.(neorpc.NotificationFilter) - notification := r.Payload[0].(*state.ContainedNotificationEvent) - hashOk := filt.Contract == nil || notification.ScriptHash.Equals(*filt.Contract) - nameOk := filt.Name == nil || notification.Name == *filt.Name - return hashOk && nameOk - case neorpc.ExecutionEventID: - filt := f.filter.(neorpc.ExecutionFilter) - applog := r.Payload[0].(*state.AppExecResult) - return applog.VMState.String() == filt.State - case neorpc.NotaryRequestEventID: - filt := f.filter.(neorpc.TxFilter) - req := r.Payload[0].(*result.NotaryRequestEvent) - senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender - signerOK := true - if filt.Signer != nil { - signerOK = false - for _, signer := range req.NotaryRequest.MainTransaction.Signers { - if signer.Account.Equals(*filt.Signer) { - signerOK = true - break - } - } - } - return senderOk && signerOK - } - return false -} From 10a0716217e5de766012897494c7e7fdba727ff1 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 12 Oct 2022 15:23:32 +0300 Subject: [PATCH 03/15] rpc: implement transaction awaiting functionality Close #2704. --- pkg/neorpc/result/invoke.go | 17 +++ pkg/neorpc/result/invoke_test.go | 40 ++++++ pkg/rpcclient/actor/actor.go | 1 + pkg/rpcclient/actor/actor_test.go | 8 ++ pkg/rpcclient/actor/waiter.go | 203 +++++++++++++++++++++++++++++ pkg/rpcclient/client.go | 17 ++- pkg/rpcclient/notary/actor_test.go | 8 ++ pkg/rpcclient/wsclient.go | 8 ++ pkg/services/rpcsrv/client_test.go | 54 ++++++++ 9 files changed, 353 insertions(+), 3 deletions(-) create mode 100644 pkg/rpcclient/actor/waiter.go diff --git a/pkg/neorpc/result/invoke.go b/pkg/neorpc/result/invoke.go index ef579f600..2fa32f4e4 100644 --- a/pkg/neorpc/result/invoke.go +++ b/pkg/neorpc/result/invoke.go @@ -232,3 +232,20 @@ func (r *Invoke) UnmarshalJSON(data []byte) error { r.Diagnostics = aux.Diagnostics return nil } + +// AppExecToInvocation converts state.AppExecResult to result.Invoke and can be used +// as a wrapper for actor.Wait. The result of AppExecToInvocation doesn't have all fields +// properly filled, it's limited by State, GasConsumed, Stack, FaultException and Notifications. +// The result of AppExecToInvocation can be passed to unwrap package helpers. +func AppExecToInvocation(aer *state.AppExecResult, err error) (*Invoke, error) { + if err != nil { + return nil, err + } + return &Invoke{ + State: aer.VMState.String(), + GasConsumed: aer.GasConsumed, + Stack: aer.Stack, + FaultException: aer.FaultException, + Notifications: aer.Events, + }, nil +} diff --git a/pkg/neorpc/result/invoke_test.go b/pkg/neorpc/result/invoke_test.go index 09dc588dc..64a145740 100644 --- a/pkg/neorpc/result/invoke_test.go +++ b/pkg/neorpc/result/invoke_test.go @@ -3,13 +3,17 @@ package result import ( "encoding/base64" "encoding/json" + "errors" "math/big" "testing" + "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/stretchr/testify/require" ) @@ -49,3 +53,39 @@ func TestInvoke_MarshalJSON(t *testing.T) { require.NoError(t, json.Unmarshal(data, actual)) require.Equal(t, result, actual) } + +func TestAppExecToInvocation(t *testing.T) { + // With error. + someErr := errors.New("some err") + _, err := AppExecToInvocation(nil, someErr) + require.ErrorIs(t, err, someErr) + + // Good. + h := util.Uint256{1, 2, 3} + ex := state.Execution{ + Trigger: trigger.Application, + VMState: vmstate.Fault, + GasConsumed: 123, + Stack: []stackitem.Item{stackitem.NewBigInteger(big.NewInt(123))}, + Events: []state.NotificationEvent{{ + ScriptHash: util.Uint160{3, 2, 1}, + Name: "Notification", + Item: stackitem.NewArray([]stackitem.Item{stackitem.Null{}}), + }}, + FaultException: "some fault exception", + } + inv, err := AppExecToInvocation(&state.AppExecResult{ + Container: h, + Execution: ex, + }, nil) + require.NoError(t, err) + require.Equal(t, ex.VMState.String(), inv.State) + require.Equal(t, ex.GasConsumed, inv.GasConsumed) + require.Nil(t, inv.Script) + require.Equal(t, ex.Stack, inv.Stack) + require.Equal(t, ex.FaultException, inv.FaultException) + require.Equal(t, ex.Events, inv.Notifications) + require.Nil(t, inv.Transaction) + require.Nil(t, inv.Diagnostics) + require.Equal(t, uuid.UUID{}, inv.Session) +} diff --git a/pkg/rpcclient/actor/actor.go b/pkg/rpcclient/actor/actor.go index 59b0ad3a7..cab05e218 100644 --- a/pkg/rpcclient/actor/actor.go +++ b/pkg/rpcclient/actor/actor.go @@ -25,6 +25,7 @@ import ( // create and send transactions. type RPCActor interface { invoker.RPCInvoke + RPCPollingWaiter CalculateNetworkFee(tx *transaction.Transaction) (int64, error) GetBlockCount() (uint32, error) diff --git a/pkg/rpcclient/actor/actor_test.go b/pkg/rpcclient/actor/actor_test.go index d621d2770..895d54743 100644 --- a/pkg/rpcclient/actor/actor_test.go +++ b/pkg/rpcclient/actor/actor_test.go @@ -1,6 +1,7 @@ package actor import ( + "context" "errors" "testing" @@ -9,6 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/wallet" @@ -52,6 +54,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { return nil, nil // Just a stub, unused by actor. } +func (r *RPCClient) Context() context.Context { + panic("TODO") +} +func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { + panic("TODO") +} func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) { client := &RPCClient{ version: &result.Version{ diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go new file mode 100644 index 000000000..f42707631 --- /dev/null +++ b/pkg/rpcclient/actor/waiter.go @@ -0,0 +1,203 @@ +package actor + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" + "github.com/nspcc-dev/neo-go/pkg/util" +) + +// PollingWaiterRetryCount is a threshold for a number of subsequent failed +// attempts to get block count from the RPC server for PollingWaiter. If it fails +// to retrieve block count PollingWaiterRetryCount times in a raw then transaction +// awaiting attempt considered to be failed and an error is returned. +const PollingWaiterRetryCount = 3 + +var ( + // ErrTxNotAccepted is returned when transaction wasn't accepted to the chain + // even after ValidUntilBlock block persist. + ErrTxNotAccepted = errors.New("transaction was not accepted to chain") + // ErrContextDone is returned when Waiter context has been done in the middle + // of transaction awaiting process and no result was received yet. + ErrContextDone = errors.New("waiter context done") +) + +type ( + // RPCPollingWaiter is an interface that enables transaction awaiting functionality + // for Actor instance based on periodical BlockCount and ApplicationLog polls. + RPCPollingWaiter interface { + // Context should return the RPC client context to be able to gracefully + // shut down all running processes (if so). + Context() context.Context + GetBlockCount() (uint32, error) + GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) + } + // RPCEventWaiter is an interface that enables improved transaction awaiting functionality + // for Actor instance based on web-socket Block and ApplicationLog notifications. + RPCEventWaiter interface { + RPCPollingWaiter + + SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) + Unsubscribe(id string) error + } +) + +// Wait allows to wait until transaction will be accepted to the chain. It can be +// used as a wrapper for Send or SignAndSend and accepts transaction hash, +// ValidUntilBlock value and an error. It returns transaction execution result +// or an error if transaction wasn't accepted to the chain. +func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + if err != nil { + return nil, err + } + if wsW, ok := a.client.(RPCEventWaiter); ok { + return a.waitWithWSWaiter(wsW, h, vub) + } + return a.waitWithSimpleWaiter(a.client, h, vub) +} + +// waitWithSimpleWaiter waits until transaction is accepted to the chain and +// returns its execution result or an error if it's missing from chain after +// VUB block. +func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) { + var ( + currentHeight uint32 + failedAttempt int + pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2 + ) + if pollTime == 0 { + pollTime = time.Second + } + timer := time.NewTicker(pollTime) + defer timer.Stop() + for { + select { + case <-timer.C: + blockCount, err := c.GetBlockCount() + if err != nil { + failedAttempt++ + if failedAttempt > PollingWaiterRetryCount { + return nil, fmt.Errorf("failed to retrieve block count: %w", err) + } + continue + } + failedAttempt = 0 + if blockCount-1 > currentHeight { + currentHeight = blockCount - 1 + } + t := trigger.Application + res, err := c.GetApplicationLog(h, &t) + if err == nil { + return &state.AppExecResult{ + Container: h, + Execution: res.Executions[0], + }, nil + } + if currentHeight >= vub { + return nil, ErrTxNotAccepted + } + + case <-c.Context().Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + } + } +} + +// waitWithWSWaiter waits until transaction is accepted to the chain and returns +// its execution result or an error if it's missing from chain after VUB block. +// It uses optimized web-socket waiter if possible. +func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) { + var wsWaitErr error + defer func() { + if wsWaitErr != nil { + res, waitErr = a.waitWithSimpleWaiter(c, h, vub) + if waitErr != nil { + waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) + } + } + }() + rcvr := make(chan rpcclient.Notification) + defer func() { + drainLoop: + // Drain rcvr to avoid other notification receivers blocking. + for { + select { + case <-rcvr: + default: + break drainLoop + } + } + close(rcvr) + }() + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) + return + } + defer func() { + err = c.Unsubscribe(blocksID) + if err != nil { + errFmt := "failed to unsubscribe from blocks (id: %s): %v" + errArgs := []interface{}{blocksID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr) + if err != nil { + wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) + return + } + defer func() { + err = c.Unsubscribe(txsID) + if err != nil { + errFmt := "failed to unsubscribe from transactions (id: %s): %v" + errArgs := []interface{}{txsID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + + for { + select { + case ntf := <-rcvr: + switch ntf.Type { + case neorpc.BlockEventID: + block := ntf.Value.(*block.Block) + // Execution event follows the block event, thus wait until the block next to the VUB to be sure. + if block.Index > vub { + waitErr = ErrTxNotAccepted + return + } + case neorpc.ExecutionEventID: + aer := ntf.Value.(*state.AppExecResult) + if aer.Container.Equals(h) { + res = aer + return + } + case neorpc.MissedEventID: + // We're toast, retry with non-ws client. + wsWaitErr = errors.New("some event was missed") + return + } + case <-c.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + return + } + } +} diff --git a/pkg/rpcclient/client.go b/pkg/rpcclient/client.go index ba956b0db..e061eab6e 100644 --- a/pkg/rpcclient/client.go +++ b/pkg/rpcclient/client.go @@ -33,8 +33,11 @@ type Client struct { cli *http.Client endpoint *url.URL ctx context.Context - opts Options - requestF func(*neorpc.Request) (*neorpc.Response, error) + // ctxCancel is a cancel function aimed to send closing signal to the users of + // ctx. + ctxCancel func() + opts Options + requestF func(*neorpc.Request) (*neorpc.Response, error) // reader is an Invoker that has no signers and uses current state, // it's used to implement various getters. It'll be removed eventually, @@ -125,7 +128,9 @@ func initClient(ctx context.Context, cl *Client, endpoint string, opts Options) // if opts.Cert != "" && opts.Key != "" { // } - cl.ctx = ctx + cancelCtx, cancel := context.WithCancel(ctx) + cl.ctx = cancelCtx + cl.ctxCancel = cancel cl.cli = httpClient cl.endpoint = url cl.cache = cache{ @@ -176,6 +181,7 @@ func (c *Client) Init() error { // Close closes unused underlying networks connections. func (c *Client) Close() { + c.ctxCancel() c.cli.CloseIdleConnections() } @@ -248,3 +254,8 @@ func (c *Client) Ping() error { _ = conn.Close() return nil } + +// Context returns client instance context. +func (c *Client) Context() context.Context { + return c.ctx +} diff --git a/pkg/rpcclient/notary/actor_test.go b/pkg/rpcclient/notary/actor_test.go index c3482c75e..84aed3e17 100644 --- a/pkg/rpcclient/notary/actor_test.go +++ b/pkg/rpcclient/notary/actor_test.go @@ -1,6 +1,7 @@ package notary import ( + "context" "errors" "testing" @@ -14,6 +15,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" @@ -66,6 +68,12 @@ func (r *RPCClient) TerminateSession(sessionID uuid.UUID) (bool, error) { func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCount int) ([]stackitem.Item, error) { return nil, nil // Just a stub, unused by actor. } +func (r *RPCClient) Context() context.Context { + panic("TODO") +} +func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { + panic("TODO") +} func TestNewActor(t *testing.T) { rc := &RPCClient{ diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 1711a1f26..ad68c7d39 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -162,6 +162,8 @@ func (c *WSClient) Close() { // which in turn makes wsReader receive an err from ws.ReadJSON() and also // break out of the loop closing c.done channel in its shutdown sequence. close(c.shutdown) + // Call to cancel will send signal to all users of Context(). + c.Client.ctxCancel() } <-c.done } @@ -274,6 +276,7 @@ readloop: c.respChannels = nil c.respLock.Unlock() close(c.Notifications) + c.Client.ctxCancel() } func (c *WSClient) wsWriter() { @@ -569,3 +572,8 @@ func (c *WSClient) GetError() error { } return c.closeErr } + +// Context returns WSClient Cancel context that will be terminated on Client shutdown. +func (c *WSClient) Context() context.Context { + return c.Client.ctx +} diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 6ddb6885e..9c24a4846 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -1947,3 +1947,57 @@ func TestClient_Iterator_SessionConfigVariations(t *testing.T) { } }) } + +func TestClient_Wait(t *testing.T) { + chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) + defer chain.Close() + defer rpcSrv.Shutdown() + + c, err := rpcclient.New(context.Background(), httpSrv.URL, rpcclient.Options{}) + require.NoError(t, err) + acc, err := wallet.NewAccount() + require.NoError(t, err) + act, err := actor.New(c, []actor.SignerAccount{ + { + Signer: transaction.Signer{ + Account: acc.ScriptHash(), + }, + Account: acc, + }, + }) + require.NoError(t, err) + + b, err := chain.GetBlock(chain.GetHeaderHash(1)) + require.NoError(t, err) + require.True(t, len(b.Transactions) > 0) + + check := func(t *testing.T, h util.Uint256, vub uint32, errExpected bool) { + rcvr := make(chan struct{}) + go func() { + aer, err := act.Wait(h, vub, nil) + if errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, h, aer.Container) + } + rcvr <- struct{}{} + }() + waitloop: + for { + select { + case <-rcvr: + break waitloop + case <-time.NewTimer(time.Duration(chain.GetConfig().SecondsPerBlock) * time.Second).C: + t.Fatal("transaction failed to be awaited") + } + } + } + + // Wait for transaction that has been persisted and VUB block has been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()-1, false) + // Wait for transaction that has been persisted and VUB block hasn't yet been persisted. + check(t, b.Transactions[0].Hash(), chain.BlockHeight()+1, false) + // Wait for transaction that hasn't been persisted and VUB block has been persisted. + check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) +} From 673a495527602e754ad5670e3f8a097046d1cbd9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 15:09:30 +0300 Subject: [PATCH 04/15] rpc: add "since" filter to WS block events --- docs/notifications.md | 5 ++-- pkg/neorpc/rpcevent/filter.go | 4 ++- pkg/neorpc/rpcevent/filter_test.go | 20 +++++++++++--- pkg/neorpc/types.go | 8 +++--- pkg/rpcclient/actor/waiter.go | 4 +-- pkg/rpcclient/wsclient.go | 17 ++++++------ pkg/rpcclient/wsclient_test.go | 42 ++++++++++++++++++++++++++---- 7 files changed, 75 insertions(+), 25 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 7c73c5142..7c857fda8 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server. Currently supported events: * new block added - Contents: block. Filters: primary ID. + Contents: block. Filters: primary ID, since block index. * new transaction in the block Contents: transaction. Filters: sender and signer. @@ -57,7 +57,8 @@ omitted if empty). Recognized stream names: * `block_added` Filter: `primary` as an integer with primary (speaker) node index from - ConsensusData. + ConsensusData and/or `since` as an integer with block index starting from + which new block notifications will be received. * `transaction_added` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for transaction's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index e2ca74c1b..07f0dc9f7 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -37,7 +37,9 @@ func Matches(f Comparator, r Container) bool { case neorpc.BlockEventID: filt := filter.(neorpc.BlockFilter) b := r.EventPayload().(*block.Block) - return int(b.PrimaryIndex) == filt.Primary + primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) + sinceOk := filt.Since == nil || *filt.Since <= b.Index + return primaryOk && sinceOk case neorpc.TransactionEventID: filt := filter.(neorpc.TxFilter) tx := r.EventPayload().(*transaction.Transaction) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index d16c222e6..71edeb7d1 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -39,7 +39,10 @@ func (c testContainer) EventPayload() interface{} { } func TestMatches(t *testing.T) { - primary := byte(1) + primary := 1 + badPrimary := 2 + index := uint32(5) + badHigherIndex := uint32(6) sender := util.Uint160{1, 2, 3} signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} @@ -49,7 +52,7 @@ func TestMatches(t *testing.T) { bContainer := testContainer{ id: neorpc.BlockEventID, pld: &block.Block{ - Header: block.Header{PrimaryIndex: primary}, + Header: block.Header{PrimaryIndex: byte(primary), Index: index}, }, } st := vmstate.Halt @@ -106,7 +109,16 @@ func TestMatches(t *testing.T) { name: "block, primary mismatch", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: int(primary + 1)}, + filter: neorpc.BlockFilter{Primary: &badPrimary}, + }, + container: bContainer, + expected: false, + }, + { + name: "block, since mismatch", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Since: &badHigherIndex}, }, container: bContainer, expected: false, @@ -115,7 +127,7 @@ func TestMatches(t *testing.T) { name: "block, filter match", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: int(primary)}, + filter: neorpc.BlockFilter{Primary: &primary, Since: &index}, }, container: bContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index ff01cbfa7..70bab912c 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -71,10 +71,12 @@ type ( Payload []interface{} `json:"params"` } - // BlockFilter is a wrapper structure for the block event filter. The only - // allowed filter is primary index. + // BlockFilter is a wrapper structure for the block event filter. It allows + // to filter blocks by primary index and by block index (allowing blocks since + // the specified index). BlockFilter struct { - Primary int `json:"primary"` + Primary *int `json:"primary,omitempty"` + Since *uint32 `json:"since,omitempty"` } // TxFilter is a wrapper structure for the transaction event filter. It // allows to filter transactions by senders and signers. diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index f42707631..21f9a0edf 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -45,7 +45,7 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } @@ -138,7 +138,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( } close(rcvr) }() - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, rcvr) + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index ad68c7d39..9a6d4da90 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -393,22 +393,23 @@ func (c *WSClient) performUnsubscription(id string) error { } // SubscribeForNewBlocks adds subscription for new block events to this instance -// of the client. It can be filtered by primary consensus node index, nil value doesn't -// add any filters. -func (c *WSClient) SubscribeForNewBlocks(primary *int) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, c.Notifications) +// of the client. It can be filtered by primary consensus node index and/or block +// index allowing to receive blocks since the specified index only, nil value is +// treated as missing filter. +func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) { + return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications) } // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the // specified new blocks notifications. The receiver channel must be properly read and // drained after usage in order not to block other notification receivers. // See SubscribeForNewBlocks for parameter details. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"block_added"} var flt *neorpc.BlockFilter - if primary != nil { - flt = &neorpc.BlockFilter{Primary: *primary} - params = append(params, *flt) + if primary != nil || sinceIndex != nil { + flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex} + params = append(params, flt) } rcvr := notificationReceiver{ typ: neorpc.BlockEventID, diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 1bc396a21..6f6ec9bdf 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) { ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocks(nil) + return wsc.SubscribeForNewBlocks(nil, nil) }, "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, ch) + return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch) }, "transactions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactions(nil, nil) @@ -283,17 +283,49 @@ func TestWSFilteredSubscriptions(t *testing.T) { clientCode func(*testing.T, *WSClient) serverCode func(*testing.T, *params.Params) }{ - {"blocks", + {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocks(&primary) + _, err := wsc.SubscribeForNewBlocks(&primary, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { param := p.Value(1) filt := new(neorpc.BlockFilter) require.NoError(t, json.Unmarshal(param.RawMessage, filt)) - require.Equal(t, 3, filt.Primary) + require.Equal(t, 3, *filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + }, + }, + {"blocks since", + func(t *testing.T, wsc *WSClient) { + var since uint32 = 3 + _, err := wsc.SubscribeForNewBlocks(nil, &since) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, uint32(3), *filt.Since) + }, + }, + {"blocks primary and since", + func(t *testing.T, wsc *WSClient) { + var ( + since uint32 = 3 + primary = 2 + ) + _, err := wsc.SubscribeForNewBlocks(&primary, &since) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, 2, *filt.Primary) + require.Equal(t, uint32(3), *filt.Since) }, }, {"transactions sender", From 71069b0ed082dac4f8fa8e95a1d147f0a51c3ebe Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 15:12:48 +0300 Subject: [PATCH 05/15] rpc: improve WS-enabled transaction awaiting Fetch blocks since VUB+1, if block received and we haven't returned the result yet, then transaction wasn't accepted at all. --- pkg/rpcclient/actor/waiter.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 21f9a0edf..b48ba0f3a 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" @@ -138,7 +137,9 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( } close(rcvr) }() - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, nil, rcvr) + // Execution event follows the block event, thus wait until the block next to the VUB to be sure. + since := vub + 1 + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return @@ -178,12 +179,8 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( case ntf := <-rcvr: switch ntf.Type { case neorpc.BlockEventID: - block := ntf.Value.(*block.Block) - // Execution event follows the block event, thus wait until the block next to the VUB to be sure. - if block.Index > vub { - waitErr = ErrTxNotAccepted - return - } + waitErr = ErrTxNotAccepted + return case neorpc.ExecutionEventID: aer := ntf.Value.(*state.AppExecResult) if aer.Container.Equals(h) { From d7c1f3eac7da319dd0be6d79fdd7ecff8ee59add Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 15:20:55 +0300 Subject: [PATCH 06/15] rpc: add "container" filter to WS execution notifications --- docs/notifications.md | 5 ++-- pkg/neorpc/rpcevent/filter.go | 4 ++- pkg/neorpc/rpcevent/filter_test.go | 18 +++++++++-- pkg/neorpc/types.go | 3 +- pkg/rpcclient/actor/waiter.go | 4 +-- pkg/rpcclient/wsclient.go | 19 +++++++----- pkg/rpcclient/wsclient_test.go | 48 +++++++++++++++++++++++++----- pkg/services/rpcsrv/server.go | 2 +- 8 files changed, 77 insertions(+), 26 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index 7c857fda8..a35867a02 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -19,7 +19,7 @@ Currently supported events: Contents: container hash, contract hash, notification name, stack item. Filters: contract hash, notification name. * transaction executed - Contents: application execution result. Filters: VM state. + Contents: application execution result. Filters: VM state, script container hash. * new/removed P2P notary request (if `P2PSigExtensions` are enabled) Contents: P2P notary request. Filters: request sender and main tx signer. @@ -69,7 +69,8 @@ Recognized stream names: notification name. * `transaction_executed` Filter: `state` field containing `HALT` or `FAULT` string for successful - and failed executions respectively. + and failed executions respectively and/or `container` field containing + script container hash. * `notary_request_event` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for notary request's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index 07f0dc9f7..65de9df59 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -64,7 +64,9 @@ func Matches(f Comparator, r Container) bool { case neorpc.ExecutionEventID: filt := filter.(neorpc.ExecutionFilter) applog := r.EventPayload().(*state.AppExecResult) - return applog.VMState.String() == filt.State + stateOK := filt.State == nil || applog.VMState.String() == *filt.State + containerOK := filt.Container == nil || applog.Container.Equals(*filt.Container) + return stateOK && containerOK case neorpc.NotaryRequestEventID: filt := filter.(neorpc.TxFilter) req := r.EventPayload().(*result.NotaryRequestEvent) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index 71edeb7d1..9e36a7b54 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -47,6 +47,8 @@ func TestMatches(t *testing.T) { signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} badUint160 := util.Uint160{9, 9, 9} + cnt := util.Uint256{1, 2, 3} + badUint256 := util.Uint256{9, 9, 9} name := "ntf name" badName := "bad name" bContainer := testContainer{ @@ -56,6 +58,7 @@ func TestMatches(t *testing.T) { }, } st := vmstate.Halt + goodState := st.String() badState := "FAULT" txContainer := testContainer{ id: neorpc.TransactionEventID, @@ -67,7 +70,7 @@ func TestMatches(t *testing.T) { } exContainer := testContainer{ id: neorpc.ExecutionEventID, - pld: &state.AppExecResult{Execution: state.Execution{VMState: st}}, + pld: &state.AppExecResult{Container: cnt, Execution: state.Execution{VMState: st}}, } ntrContainer := testContainer{ id: neorpc.NotaryRequestEventID, @@ -208,7 +211,16 @@ func TestMatches(t *testing.T) { name: "execution, state mismatch", comparator: testComparator{ id: neorpc.ExecutionEventID, - filter: neorpc.ExecutionFilter{State: badState}, + filter: neorpc.ExecutionFilter{State: &badState}, + }, + container: exContainer, + expected: false, + }, + { + name: "execution, container mismatch", + comparator: testComparator{ + id: neorpc.ExecutionEventID, + filter: neorpc.ExecutionFilter{Container: &badUint256}, }, container: exContainer, expected: false, @@ -217,7 +229,7 @@ func TestMatches(t *testing.T) { name: "execution, filter mismatch", comparator: testComparator{ id: neorpc.ExecutionEventID, - filter: neorpc.ExecutionFilter{State: st.String()}, + filter: neorpc.ExecutionFilter{State: &goodState, Container: &cnt}, }, container: exContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index 70bab912c..f5cf2882f 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -95,7 +95,8 @@ type ( // events. It allows to choose failing or successful transactions based // on their VM state. ExecutionFilter struct { - State string `json:"state"` + State *string `json:"state,omitempty"` + Container *util.Uint256 `json:"container,omitempty"` } // SignerWithWitness represents transaction's signer with the corresponding witness. SignerWithWitness struct { diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index b48ba0f3a..5623e16de 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -45,7 +45,7 @@ type ( RPCPollingWaiter SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) - SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } ) @@ -156,7 +156,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, rcvr) + txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 9a6d4da90..8eafd55af 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -475,23 +475,26 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin // SubscribeForTransactionExecutions adds subscription for application execution // results generated during transaction execution to this instance of the client. It can // be filtered by state (HALT/FAULT) to check for successful or failing -// transactions, nil value means no filtering. -func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, error) { - return c.SubscribeForTransactionExecutionsWithChan(state, c.Notifications) +// transactions; it can also be filtered by script container hash. +// nil value means no filtering. +func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) { + return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications) } // SubscribeForTransactionExecutionsWithChan registers provided channel as a // receiver for the specified execution notifications. The receiver channel must be // properly read and drained after usage in order not to block other notification // receivers. See SubscribeForTransactionExecutions for parameter details. -func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"transaction_executed"} var flt *neorpc.ExecutionFilter - if state != nil { - if *state != "HALT" && *state != "FAULT" { - return "", errors.New("bad state parameter") + if state != nil || container != nil { + if state != nil { + if *state != "HALT" && *state != "FAULT" { + return "", errors.New("bad state parameter") + } } - flt = &neorpc.ExecutionFilter{State: *state} + flt = &neorpc.ExecutionFilter{State: state, Container: container} params = append(params, *flt) } rcvr := notificationReceiver{ diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 6f6ec9bdf..3788182e5 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -53,10 +53,10 @@ func TestWSClientSubscription(t *testing.T) { return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) }, "executions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutions(nil) + return wsc.SubscribeForTransactionExecutions(nil, nil) }, "executions_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutionsWithChan(nil, ch) + return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch) }, } t.Run("good", func(t *testing.T) { @@ -206,6 +206,8 @@ func TestWSClientEvents(t *testing.T) { ch1 := make(chan Notification) ch2 := make(chan Notification) ch3 := make(chan Notification) + halt := "HALT" + fault := "FAULT" wsc.subscriptionsLock.Lock() wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} wsc.subscriptions["1"] = notificationReceiver{typ: neorpc.ExecutionEventID, ch: wsc.Notifications} @@ -213,8 +215,8 @@ func TestWSClientEvents(t *testing.T) { wsc.subscriptions["3"] = notificationReceiver{typ: neorpc.BlockEventID, ch: ch1} wsc.subscriptions["4"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} wsc.subscriptions["5"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: ch2} // check duplicating subscriptions - wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "HALT"}, ch: ch2} - wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: "FAULT"}, ch: ch3} + wsc.subscriptions["6"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &halt}, ch: ch2} + wsc.subscriptions["7"] = notificationReceiver{typ: neorpc.ExecutionEventID, filter: neorpc.ExecutionFilter{State: &fault}, ch: ch3} // MissedEvent must be delivered without subscription. wsc.subscriptionsLock.Unlock() @@ -272,7 +274,7 @@ func TestWSExecutionVMStateCheck(t *testing.T) { wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) filter := "NONE" - _, err = wsc.SubscribeForTransactionExecutions(&filter) + _, err = wsc.SubscribeForTransactionExecutions(&filter, nil) require.Error(t, err) wsc.Close() } @@ -414,17 +416,47 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.Equal(t, "my_pretty_notification", *filt.Name) }, }, - {"executions", + {"executions state", func(t *testing.T, wsc *WSClient) { state := "FAULT" - _, err := wsc.SubscribeForTransactionExecutions(&state) + _, err := wsc.SubscribeForTransactionExecutions(&state, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { param := p.Value(1) filt := new(neorpc.ExecutionFilter) require.NoError(t, json.Unmarshal(param.RawMessage, filt)) - require.Equal(t, "FAULT", filt.State) + require.Equal(t, "FAULT", *filt.State) + require.Equal(t, (*util.Uint256)(nil), filt.Container) + }, + }, + {"executions container", + func(t *testing.T, wsc *WSClient) { + container := util.Uint256{1, 2, 3} + _, err := wsc.SubscribeForTransactionExecutions(nil, &container) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.ExecutionFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*string)(nil), filt.State) + require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container) + }, + }, + {"executions state and container", + func(t *testing.T, wsc *WSClient) { + state := "FAULT" + container := util.Uint256{1, 2, 3} + _, err := wsc.SubscribeForTransactionExecutions(&state, &container) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.ExecutionFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, "FAULT", *filt.State) + require.Equal(t, util.Uint256{1, 2, 3}, *filt.Container) }, }, } diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index d5874e92d..5f8d4af9e 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -2426,7 +2426,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ case neorpc.ExecutionEventID: flt := new(neorpc.ExecutionFilter) err = jd.Decode(flt) - if err == nil && (flt.State == "HALT" || flt.State == "FAULT") { + if err == nil && (flt.State != nil && (*flt.State == "HALT" || *flt.State == "FAULT")) { filter = *flt } else if err == nil { err = errors.New("invalid state") From 345d48d051cea2fa95737302773e1ab352beb256 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 18 Oct 2022 16:42:03 +0300 Subject: [PATCH 07/15] rpc: improve WS-based transaction awaiting Subscribe only for required aers. --- pkg/rpcclient/actor/waiter.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 5623e16de..9085d9045 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -156,7 +156,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, nil, rcvr) + txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return @@ -182,11 +182,8 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = ErrTxNotAccepted return case neorpc.ExecutionEventID: - aer := ntf.Value.(*state.AppExecResult) - if aer.Container.Equals(h) { - res = aer - return - } + res = ntf.Value.(*state.AppExecResult) + return case neorpc.MissedEventID: // We're toast, retry with non-ws client. wsWaitErr = errors.New("some event was missed") From 8e84bb51d54ef3f22d5d1e58524884f6016bf1ce Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 19 Oct 2022 07:31:27 +0300 Subject: [PATCH 08/15] rpc: add "till" filter to WS block events --- docs/notifications.md | 8 +++++--- pkg/neorpc/rpcevent/filter.go | 3 ++- pkg/neorpc/rpcevent/filter_test.go | 12 +++++++++++- pkg/neorpc/types.go | 1 + pkg/rpcclient/actor/waiter.go | 4 ++-- pkg/rpcclient/wsclient.go | 10 +++++----- pkg/rpcclient/wsclient_test.go | 31 ++++++++++++++++++++++++------ 7 files changed, 51 insertions(+), 18 deletions(-) diff --git a/docs/notifications.md b/docs/notifications.md index a35867a02..8a2b7ffdb 100644 --- a/docs/notifications.md +++ b/docs/notifications.md @@ -10,7 +10,7 @@ receive them as JSON-RPC notifications from the server. Currently supported events: * new block added - Contents: block. Filters: primary ID, since block index. + Contents: block. Filters: primary ID, since/till block indexes. * new transaction in the block Contents: transaction. Filters: sender and signer. @@ -57,8 +57,10 @@ omitted if empty). Recognized stream names: * `block_added` Filter: `primary` as an integer with primary (speaker) node index from - ConsensusData and/or `since` as an integer with block index starting from - which new block notifications will be received. + ConsensusData and/or `since` field as an integer value with block + index starting from which new block notifications will be received and/or + `till` field as an integer values containing block index till which new + block notifications will be received. * `transaction_added` Filter: `sender` field containing a string with hex-encoded Uint160 (LE representation) for transaction's `Sender` and/or `signer` in the same diff --git a/pkg/neorpc/rpcevent/filter.go b/pkg/neorpc/rpcevent/filter.go index 65de9df59..049943498 100644 --- a/pkg/neorpc/rpcevent/filter.go +++ b/pkg/neorpc/rpcevent/filter.go @@ -39,7 +39,8 @@ func Matches(f Comparator, r Container) bool { b := r.EventPayload().(*block.Block) primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex) sinceOk := filt.Since == nil || *filt.Since <= b.Index - return primaryOk && sinceOk + tillOk := filt.Till == nil || b.Index <= *filt.Till + return primaryOk && sinceOk && tillOk case neorpc.TransactionEventID: filt := filter.(neorpc.TxFilter) tx := r.EventPayload().(*transaction.Transaction) diff --git a/pkg/neorpc/rpcevent/filter_test.go b/pkg/neorpc/rpcevent/filter_test.go index 9e36a7b54..6afd20d13 100644 --- a/pkg/neorpc/rpcevent/filter_test.go +++ b/pkg/neorpc/rpcevent/filter_test.go @@ -43,6 +43,7 @@ func TestMatches(t *testing.T) { badPrimary := 2 index := uint32(5) badHigherIndex := uint32(6) + badLowerIndex := index - 1 sender := util.Uint160{1, 2, 3} signer := util.Uint160{4, 5, 6} contract := util.Uint160{7, 8, 9} @@ -126,11 +127,20 @@ func TestMatches(t *testing.T) { container: bContainer, expected: false, }, + { + name: "block, till mismatch", + comparator: testComparator{ + id: neorpc.BlockEventID, + filter: neorpc.BlockFilter{Till: &badLowerIndex}, + }, + container: bContainer, + expected: false, + }, { name: "block, filter match", comparator: testComparator{ id: neorpc.BlockEventID, - filter: neorpc.BlockFilter{Primary: &primary, Since: &index}, + filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index}, }, container: bContainer, expected: true, diff --git a/pkg/neorpc/types.go b/pkg/neorpc/types.go index f5cf2882f..aa20ec50c 100644 --- a/pkg/neorpc/types.go +++ b/pkg/neorpc/types.go @@ -77,6 +77,7 @@ type ( BlockFilter struct { Primary *int `json:"primary,omitempty"` Since *uint32 `json:"since,omitempty"` + Till *uint32 `json:"till,omitempty"` } // TxFilter is a wrapper structure for the transaction event filter. It // allows to filter transactions by senders and signers. diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 9085d9045..5c19ebccd 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -44,7 +44,7 @@ type ( RPCEventWaiter interface { RPCPollingWaiter - SubscribeForNewBlocksWithChan(primary *int, since *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) + SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) Unsubscribe(id string) error } @@ -139,7 +139,7 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, rcvr) + blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 8eafd55af..da1018639 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -396,19 +396,19 @@ func (c *WSClient) performUnsubscription(id string) error { // of the client. It can be filtered by primary consensus node index and/or block // index allowing to receive blocks since the specified index only, nil value is // treated as missing filter. -func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex *uint32) (string, error) { - return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, c.Notifications) +func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { + return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications) } // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the // specified new blocks notifications. The receiver channel must be properly read and // drained after usage in order not to block other notification receivers. // See SubscribeForNewBlocks for parameter details. -func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex *uint32, rcvrCh chan<- Notification) (string, error) { +func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) { params := []interface{}{"block_added"} var flt *neorpc.BlockFilter - if primary != nil || sinceIndex != nil { - flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex} + if primary != nil || sinceIndex != nil || tillIndex != nil { + flt = &neorpc.BlockFilter{Primary: primary, Since: sinceIndex, Till: tillIndex} params = append(params, flt) } rcvr := notificationReceiver{ diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 3788182e5..5662b1a32 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -35,10 +35,10 @@ func TestWSClientSubscription(t *testing.T) { ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocks(nil, nil) + return wsc.SubscribeForNewBlocks(nil, nil, nil) }, "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocksWithChan(nil, nil, ch) + return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch) }, "transactions": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactions(nil, nil) @@ -288,7 +288,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocks(&primary, nil) + _, err := wsc.SubscribeForNewBlocks(&primary, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -297,12 +297,13 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, 3, *filt.Primary) require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) }, }, {"blocks since", func(t *testing.T, wsc *WSClient) { var since uint32 = 3 - _, err := wsc.SubscribeForNewBlocks(nil, &since) + _, err := wsc.SubscribeForNewBlocks(nil, &since, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -311,15 +312,32 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, (*int)(nil), filt.Primary) require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, (*uint32)(nil), filt.Till) }, }, - {"blocks primary and since", + {"blocks till", + func(t *testing.T, wsc *WSClient) { + var till uint32 = 3 + _, err := wsc.SubscribeForNewBlocks(nil, nil, &till) + require.NoError(t, err) + }, + func(t *testing.T, p *params.Params) { + param := p.Value(1) + filt := new(neorpc.BlockFilter) + require.NoError(t, json.Unmarshal(param.RawMessage, filt)) + require.Equal(t, (*int)(nil), filt.Primary) + require.Equal(t, (*uint32)(nil), filt.Since) + require.Equal(t, (uint32)(3), *filt.Till) + }, + }, + {"blocks primary, since and till", func(t *testing.T, wsc *WSClient) { var ( since uint32 = 3 primary = 2 + till uint32 = 5 ) - _, err := wsc.SubscribeForNewBlocks(&primary, &since) + _, err := wsc.SubscribeForNewBlocks(&primary, &since, &till) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -328,6 +346,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { require.NoError(t, json.Unmarshal(param.RawMessage, filt)) require.Equal(t, 2, *filt.Primary) require.Equal(t, uint32(3), *filt.Since) + require.Equal(t, uint32(5), *filt.Till) }, }, {"transactions sender", From 5b81cb065f0df6cc46f1cc7e3f3bc96ee8b6470d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 19 Oct 2022 11:55:39 +0300 Subject: [PATCH 09/15] rpc: refactor waiter-related actor code --- pkg/rpcclient/actor/actor.go | 16 +++- pkg/rpcclient/actor/waiter.go | 135 ++++++++++++++++++++++++++-------- 2 files changed, 119 insertions(+), 32 deletions(-) diff --git a/pkg/rpcclient/actor/actor.go b/pkg/rpcclient/actor/actor.go index cab05e218..c6ee86f92 100644 --- a/pkg/rpcclient/actor/actor.go +++ b/pkg/rpcclient/actor/actor.go @@ -25,7 +25,6 @@ import ( // create and send transactions. type RPCActor interface { invoker.RPCInvoke - RPCPollingWaiter CalculateNetworkFee(tx *transaction.Transaction) (int64, error) GetBlockCount() (uint32, error) @@ -54,8 +53,22 @@ type SignerAccount struct { // action to be performed, "Make" prefix is used for methods that create // transactions in various ways, while "Send" prefix is used by methods that // directly transmit created transactions to the RPC server. +// +// Actor also provides a Waiter interface to wait until transaction will be +// accepted to the chain. Depending on the underlying RPCActor functionality, +// transaction awaiting can be performed via web-socket using RPC notifications +// subsystem with EventWaiter, via regular RPC requests using a poll-based +// algorithm with PollingWaiter or can not be performed if RPCActor doesn't +// implement none of RPCEventWaiter and RPCPollingWaiter interfaces with +// NullWaiter. ErrAwaitingNotSupported will be returned on attempt to await the +// transaction in the latter case. Waiter uses context of the underlying RPCActor +// and interrupts transaction awaiting process if the context is done. +// ErrContextDone wrapped with the context's error will be returned in this case. +// Otherwise, transaction awaiting process is ended with ValidUntilBlock acceptance +// and ErrTxNotAccepted is returned if transaction wasn't accepted by this moment. type Actor struct { invoker.Invoker + Waiter client RPCActor opts Options @@ -109,6 +122,7 @@ func New(ra RPCActor, signers []SignerAccount) (*Actor, error) { } return &Actor{ Invoker: *inv, + Waiter: newWaiter(ra, version), client: ra, opts: NewDefaultOptions(), signers: signers, diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 5c19ebccd..724f9b734 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -27,20 +27,34 @@ var ( // ErrContextDone is returned when Waiter context has been done in the middle // of transaction awaiting process and no result was received yet. ErrContextDone = errors.New("waiter context done") + // ErrAwaitingNotSupported is returned from Wait method if Waiter instance + // doesn't support transaction awaiting. + ErrAwaitingNotSupported = errors.New("awaiting not supported") ) type ( + // Waiter is an interface providing transaction awaiting functionality to Actor. + Waiter interface { + // Wait allows to wait until transaction will be accepted to the chain. It can be + // used as a wrapper for Send or SignAndSend and accepts transaction hash, + // ValidUntilBlock value and an error. It returns transaction execution result + // or an error if transaction wasn't accepted to the chain. + Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) + } // RPCPollingWaiter is an interface that enables transaction awaiting functionality // for Actor instance based on periodical BlockCount and ApplicationLog polls. RPCPollingWaiter interface { // Context should return the RPC client context to be able to gracefully // shut down all running processes (if so). Context() context.Context + GetVersion() (*result.Version, error) GetBlockCount() (uint32, error) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) } // RPCEventWaiter is an interface that enables improved transaction awaiting functionality - // for Actor instance based on web-socket Block and ApplicationLog notifications. + // for Actor instance based on web-socket Block and ApplicationLog notifications. RPCEventWaiter + // contains RPCPollingWaiter under the hood and falls back to polling when subscription-based + // awaiting fails. RPCEventWaiter interface { RPCPollingWaiter @@ -50,28 +64,73 @@ type ( } ) -// Wait allows to wait until transaction will be accepted to the chain. It can be -// used as a wrapper for Send or SignAndSend and accepts transaction hash, -// ValidUntilBlock value and an error. It returns transaction execution result -// or an error if transaction wasn't accepted to the chain. -func (a *Actor) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { +// NullWaiter is a Waiter stub that doesn't support transaction awaiting functionality. +type NullWaiter struct{} + +// PollingWaiter is a polling-based Waiter. +type PollingWaiter struct { + polling RPCPollingWaiter + version *result.Version +} + +// EventWaiter is a websocket-based Waiter. +type EventWaiter struct { + ws RPCEventWaiter + polling Waiter +} + +// newWaiter creates Waiter instance. It can be either websocket-based or +// polling-base, otherwise Waiter stub is returned. +func newWaiter(ra RPCActor, v *result.Version) Waiter { + if eventW, ok := ra.(RPCEventWaiter); ok { + return &EventWaiter{ + ws: eventW, + polling: &PollingWaiter{ + polling: eventW, + version: v, + }, + } + } + if pollW, ok := ra.(RPCPollingWaiter); ok { + return &PollingWaiter{ + polling: pollW, + version: v, + } + } + return NewNullWaiter() +} + +// NewNullWaiter creates an instance of Waiter stub. +func NewNullWaiter() NullWaiter { + return NullWaiter{} +} + +// Wait implements Waiter interface. +func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + return nil, ErrAwaitingNotSupported +} + +// NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. +func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { + v, err := waiter.GetVersion() if err != nil { return nil, err } - if wsW, ok := a.client.(RPCEventWaiter); ok { - return a.waitWithWSWaiter(wsW, h, vub) - } - return a.waitWithSimpleWaiter(a.client, h, vub) + return &PollingWaiter{ + polling: waiter, + version: v, + }, nil } -// waitWithSimpleWaiter waits until transaction is accepted to the chain and -// returns its execution result or an error if it's missing from chain after -// VUB block. -func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uint32) (*state.AppExecResult, error) { +// Wait implements Waiter interface. +func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + if err != nil { + return nil, err + } var ( currentHeight uint32 failedAttempt int - pollTime = time.Millisecond * time.Duration(a.GetVersion().Protocol.MillisecondsPerBlock) / 2 + pollTime = time.Millisecond * time.Duration(w.version.Protocol.MillisecondsPerBlock) / 2 ) if pollTime == 0 { pollTime = time.Second @@ -81,7 +140,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin for { select { case <-timer.C: - blockCount, err := c.GetBlockCount() + blockCount, err := w.polling.GetBlockCount() if err != nil { failedAttempt++ if failedAttempt > PollingWaiterRetryCount { @@ -94,7 +153,7 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin currentHeight = blockCount - 1 } t := trigger.Application - res, err := c.GetApplicationLog(h, &t) + res, err := w.polling.GetApplicationLog(h, &t) if err == nil { return &state.AppExecResult{ Container: h, @@ -104,21 +163,35 @@ func (a *Actor) waitWithSimpleWaiter(c RPCPollingWaiter, h util.Uint256, vub uin if currentHeight >= vub { return nil, ErrTxNotAccepted } - - case <-c.Context().Done(): - return nil, fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + case <-w.polling.Context().Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err()) } } } -// waitWithWSWaiter waits until transaction is accepted to the chain and returns -// its execution result or an error if it's missing from chain after VUB block. -// It uses optimized web-socket waiter if possible. -func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) (res *state.AppExecResult, waitErr error) { +// NewEventWaiter creates an instance of Waiter supporting websocket event-based transaction awaiting. +// EventWaiter contains PollingWaiter under the hood and falls back to polling when subscription-based +// awaiting fails. +func NewEventWaiter(waiter RPCEventWaiter) (*EventWaiter, error) { + polling, err := NewPollingWaiter(waiter) + if err != nil { + return nil, err + } + return &EventWaiter{ + ws: waiter, + polling: polling, + }, nil +} + +// Wait implements Waiter interface. +func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.AppExecResult, waitErr error) { + if err != nil { + return nil, err + } var wsWaitErr error defer func() { if wsWaitErr != nil { - res, waitErr = a.waitWithSimpleWaiter(c, h, vub) + res, waitErr = w.polling.Wait(h, vub, nil) if waitErr != nil { waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) } @@ -139,13 +212,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( }() // Execution event follows the block event, thus wait until the block next to the VUB to be sure. since := vub + 1 - blocksID, err := c.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) + blocksID, err := w.ws.SubscribeForNewBlocksWithChan(nil, &since, nil, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for new blocks: %w", err) return } defer func() { - err = c.Unsubscribe(blocksID) + err = w.ws.Unsubscribe(blocksID) if err != nil { errFmt := "failed to unsubscribe from blocks (id: %s): %v" errArgs := []interface{}{blocksID, err} @@ -156,13 +229,13 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := c.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) + txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) if err != nil { wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) return } defer func() { - err = c.Unsubscribe(txsID) + err = w.ws.Unsubscribe(txsID) if err != nil { errFmt := "failed to unsubscribe from transactions (id: %s): %v" errArgs := []interface{}{txsID, err} @@ -189,8 +262,8 @@ func (a *Actor) waitWithWSWaiter(c RPCEventWaiter, h util.Uint256, vub uint32) ( wsWaitErr = errors.New("some event was missed") return } - case <-c.Context().Done(): - waitErr = fmt.Errorf("%w: %v", ErrContextDone, c.Context().Err()) + case <-w.ws.Context().Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) return } } From 388112dcaadc246a6c96b398690e64ccd7f1ebb8 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 19 Oct 2022 12:46:53 +0300 Subject: [PATCH 10/15] rpc: mark old WSClient's SubscribeFor* methods as deprecated --- pkg/rpcclient/wsclient.go | 63 ++++++++++++++++++++++++++-------- pkg/rpcclient/wsclient_test.go | 36 +++++++++---------- 2 files changed, 67 insertions(+), 32 deletions(-) diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index da1018639..a2ade71d5 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -396,15 +396,21 @@ func (c *WSClient) performUnsubscription(id string) error { // of the client. It can be filtered by primary consensus node index and/or block // index allowing to receive blocks since the specified index only, nil value is // treated as missing filter. +// +// Deprecated: please, use SubscribeForNewBlocksWithChan. This method will be removed in future versions. func (c *WSClient) SubscribeForNewBlocks(primary *int, sinceIndex, tillIndex *uint32) (string, error) { return c.SubscribeForNewBlocksWithChan(primary, sinceIndex, tillIndex, c.Notifications) } // SubscribeForNewBlocksWithChan registers provided channel as a receiver for the -// specified new blocks notifications. The receiver channel must be properly read and -// drained after usage in order not to block other notification receivers. -// See SubscribeForNewBlocks for parameter details. +// new block events. Events can be filtered by primary consensus node index, nil +// value doesn't add any filters. If the receiver channel is nil, then the default +// Notifications channel will be used. The receiver channel must be properly read +// and drained after usage in order not to block other notification receivers. func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillIndex *uint32, rcvrCh chan<- Notification) (string, error) { + if rcvrCh == nil { + rcvrCh = c.Notifications + } params := []interface{}{"block_added"} var flt *neorpc.BlockFilter if primary != nil || sinceIndex != nil || tillIndex != nil { @@ -422,15 +428,22 @@ func (c *WSClient) SubscribeForNewBlocksWithChan(primary *int, sinceIndex, tillI // SubscribeForNewTransactions adds subscription for new transaction events to // this instance of the client. It can be filtered by the sender and/or the signer, nil // value is treated as missing filter. +// +// Deprecated: please, use SubscribeForNewTransactionsWithChan. This method will be removed in future versions. func (c *WSClient) SubscribeForNewTransactions(sender *util.Uint160, signer *util.Uint160) (string, error) { return c.SubscribeForNewTransactionsWithChan(sender, signer, c.Notifications) } // SubscribeForNewTransactionsWithChan registers provided channel as a receiver -// for the specified new transactions notifications. The receiver channel must be +// for new transaction events. Events can be filtered by the sender and/or the +// signer, nil value is treated as missing filter. If the receiver channel is nil, +// then the default Notifications channel will be used. The receiver channel must be // properly read and drained after usage in order not to block other notification -// receivers. See SubscribeForNewTransactions for parameter details. +// receivers. func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, signer *util.Uint160, rcvrCh chan<- Notification) (string, error) { + if rcvrCh == nil { + rcvrCh = c.Notifications + } params := []interface{}{"transaction_added"} var flt *neorpc.TxFilter if sender != nil || signer != nil { @@ -449,15 +462,22 @@ func (c *WSClient) SubscribeForNewTransactionsWithChan(sender *util.Uint160, sig // generated during transaction execution to this instance of the client. It can be // filtered by the contract's hash (that emits notifications), nil value puts no such // restrictions. +// +// Deprecated: please, use SubscribeForExecutionNotificationsWithChan. This method will be removed in future versions. func (c *WSClient) SubscribeForExecutionNotifications(contract *util.Uint160, name *string) (string, error) { return c.SubscribeForExecutionNotificationsWithChan(contract, name, c.Notifications) } // SubscribeForExecutionNotificationsWithChan registers provided channel as a -// receiver for the specified execution events. The receiver channel must be -// properly read and drained after usage in order not to block other notification -// receivers. See SubscribeForExecutionNotifications for parameter details. +// receiver for execution events. Events can be filtered by the contract's hash +// (that emits notifications), nil value puts no such restrictions. If the +// receiver channel is nil, then the default Notifications channel will be used. +// The receiver channel must be properly read and drained after usage in order +// not to block other notification receivers. func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uint160, name *string, rcvrCh chan<- Notification) (string, error) { + if rcvrCh == nil { + rcvrCh = c.Notifications + } params := []interface{}{"notification_from_execution"} var flt *neorpc.NotificationFilter if contract != nil || name != nil { @@ -477,15 +497,23 @@ func (c *WSClient) SubscribeForExecutionNotificationsWithChan(contract *util.Uin // be filtered by state (HALT/FAULT) to check for successful or failing // transactions; it can also be filtered by script container hash. // nil value means no filtering. +// +// Deprecated: please, use SubscribeForTransactionExecutionsWithChan. This method will be removed in future versions. func (c *WSClient) SubscribeForTransactionExecutions(state *string, container *util.Uint256) (string, error) { return c.SubscribeForTransactionExecutionsWithChan(state, container, c.Notifications) } // SubscribeForTransactionExecutionsWithChan registers provided channel as a -// receiver for the specified execution notifications. The receiver channel must be -// properly read and drained after usage in order not to block other notification -// receivers. See SubscribeForTransactionExecutions for parameter details. +// receiver for application execution result events generated during transaction +// execution. Events can be filtered by state (HALT/FAULT) to check for successful +// or failing transactions; it can also be filtered by script container hash. +// nil value means no filtering. If the receiver channel is nil, then the default +// Notifications channel will be used. The receiver channel must be properly read +// and drained after usage in order not to block other notification receivers. func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- Notification) (string, error) { + if rcvrCh == nil { + rcvrCh = c.Notifications + } params := []interface{}{"transaction_executed"} var flt *neorpc.ExecutionFilter if state != nil || container != nil { @@ -509,15 +537,22 @@ func (c *WSClient) SubscribeForTransactionExecutionsWithChan(state *string, cont // addition or removal events to this instance of client. It can be filtered by // request sender's hash, or main tx signer's hash, nil value puts no such // restrictions. +// +// Deprecated: please, use SubscribeForNotaryRequestsWithChan. This method will be removed in future versions. func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { return c.SubscribeForNotaryRequestsWithChan(sender, mainSigner, c.Notifications) } // SubscribeForNotaryRequestsWithChan registers provided channel as a receiver -// for the specified notary requests notifications. The receiver channel must be -// properly read and drained after usage in order not to block other notification -// receivers. See SubscribeForNotaryRequests for parameter details. +// for notary request payload addition or removal events. It can be filtered by +// request sender's hash, or main tx signer's hash, nil value puts no such +// restrictions. If the receiver channel is nil, then the default Notifications +// channel will be used. The receiver channel must be properly read and drained +// after usage in order not to block other notification receivers. func (c *WSClient) SubscribeForNotaryRequestsWithChan(sender *util.Uint160, mainSigner *util.Uint160, rcvrCh chan<- Notification) (string, error) { + if rcvrCh == nil { + rcvrCh = c.Notifications + } params := []interface{}{"notary_request_event"} var flt *neorpc.TxFilter if sender != nil { diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 5662b1a32..501700b84 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -35,25 +35,25 @@ func TestWSClientSubscription(t *testing.T) { ch := make(chan Notification) var cases = map[string]func(*WSClient) (string, error){ "blocks": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewBlocks(nil, nil, nil) + return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, nil) }, "blocks_with_custom_ch": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewBlocksWithChan(nil, nil, nil, ch) }, "transactions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForNewTransactions(nil, nil) + return wsc.SubscribeForNewTransactionsWithChan(nil, nil, nil) }, "transactions_with_custom_ch": func(wsc *WSClient) (string, error) { return wsc.SubscribeForNewTransactionsWithChan(nil, nil, ch) }, "notifications": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForExecutionNotifications(nil, nil) + return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, nil) }, "notifications_with_custom_ch": func(wsc *WSClient) (string, error) { return wsc.SubscribeForExecutionNotificationsWithChan(nil, nil, ch) }, "executions": func(wsc *WSClient) (string, error) { - return wsc.SubscribeForTransactionExecutions(nil, nil) + return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, nil) }, "executions_with_custom_ch": func(wsc *WSClient) (string, error) { return wsc.SubscribeForTransactionExecutionsWithChan(nil, nil, ch) @@ -274,7 +274,7 @@ func TestWSExecutionVMStateCheck(t *testing.T) { wsc.getNextRequestID = getTestRequestID require.NoError(t, wsc.Init()) filter := "NONE" - _, err = wsc.SubscribeForTransactionExecutions(&filter, nil) + _, err = wsc.SubscribeForTransactionExecutionsWithChan(&filter, nil, nil) require.Error(t, err) wsc.Close() } @@ -288,7 +288,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks primary", func(t *testing.T, wsc *WSClient) { primary := 3 - _, err := wsc.SubscribeForNewBlocks(&primary, nil, nil) + _, err := wsc.SubscribeForNewBlocksWithChan(&primary, nil, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -303,7 +303,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks since", func(t *testing.T, wsc *WSClient) { var since uint32 = 3 - _, err := wsc.SubscribeForNewBlocks(nil, &since, nil) + _, err := wsc.SubscribeForNewBlocksWithChan(nil, &since, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -318,7 +318,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"blocks till", func(t *testing.T, wsc *WSClient) { var till uint32 = 3 - _, err := wsc.SubscribeForNewBlocks(nil, nil, &till) + _, err := wsc.SubscribeForNewBlocksWithChan(nil, nil, &till, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -337,7 +337,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { primary = 2 till uint32 = 5 ) - _, err := wsc.SubscribeForNewBlocks(&primary, &since, &till) + _, err := wsc.SubscribeForNewBlocksWithChan(&primary, &since, &till, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -352,7 +352,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions sender", func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForNewTransactions(&sender, nil) + _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -366,7 +366,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"transactions signer", func(t *testing.T, wsc *WSClient) { signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactions(nil, &signer) + _, err := wsc.SubscribeForNewTransactionsWithChan(nil, &signer, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -381,7 +381,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { sender := util.Uint160{1, 2, 3, 4, 5} signer := util.Uint160{0, 42} - _, err := wsc.SubscribeForNewTransactions(&sender, &signer) + _, err := wsc.SubscribeForNewTransactionsWithChan(&sender, &signer, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -395,7 +395,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications contract hash", func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} - _, err := wsc.SubscribeForExecutionNotifications(&contract, nil) + _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -409,7 +409,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"notifications name", func(t *testing.T, wsc *WSClient) { name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotifications(nil, &name) + _, err := wsc.SubscribeForExecutionNotificationsWithChan(nil, &name, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -424,7 +424,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { contract := util.Uint160{1, 2, 3, 4, 5} name := "my_pretty_notification" - _, err := wsc.SubscribeForExecutionNotifications(&contract, &name) + _, err := wsc.SubscribeForExecutionNotificationsWithChan(&contract, &name, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -438,7 +438,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions state", func(t *testing.T, wsc *WSClient) { state := "FAULT" - _, err := wsc.SubscribeForTransactionExecutions(&state, nil) + _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, nil, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -452,7 +452,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { {"executions container", func(t *testing.T, wsc *WSClient) { container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutions(nil, &container) + _, err := wsc.SubscribeForTransactionExecutionsWithChan(nil, &container, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { @@ -467,7 +467,7 @@ func TestWSFilteredSubscriptions(t *testing.T) { func(t *testing.T, wsc *WSClient) { state := "FAULT" container := util.Uint256{1, 2, 3} - _, err := wsc.SubscribeForTransactionExecutions(&state, &container) + _, err := wsc.SubscribeForTransactionExecutionsWithChan(&state, &container, nil) require.NoError(t, err) }, func(t *testing.T, p *params.Params) { From 00d44235c1ff993c6ce31ee9856e39870eb225c7 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Wed, 19 Oct 2022 15:56:16 +0300 Subject: [PATCH 11/15] rpc: add tests for RPC waiters --- pkg/rpcclient/actor/actor_test.go | 20 +++- pkg/rpcclient/actor/maker_test.go | 6 +- pkg/rpcclient/actor/waiter_test.go | 186 +++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 pkg/rpcclient/actor/waiter_test.go diff --git a/pkg/rpcclient/actor/actor_test.go b/pkg/rpcclient/actor/actor_test.go index 895d54743..bb49320df 100644 --- a/pkg/rpcclient/actor/actor_test.go +++ b/pkg/rpcclient/actor/actor_test.go @@ -15,15 +15,18 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) type RPCClient struct { err error invRes *result.Invoke netFee int64 - bCount uint32 + bCount atomic.Uint32 version *result.Version hash util.Uint256 + appLog *result.ApplicationLog + context context.Context } func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { @@ -39,7 +42,7 @@ func (r *RPCClient) CalculateNetworkFee(tx *transaction.Transaction) (int64, err return r.netFee, r.err } func (r *RPCClient) GetBlockCount() (uint32, error) { - return r.bCount, r.err + return r.bCount.Load(), r.err } func (r *RPCClient) GetVersion() (*result.Version, error) { verCopy := *r.version @@ -55,10 +58,17 @@ func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCo return nil, nil // Just a stub, unused by actor. } func (r *RPCClient) Context() context.Context { - panic("TODO") + if r.context == nil { + return context.Background() + } + return r.context } + func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { - panic("TODO") + if r.appLog != nil { + return r.appLog, nil + } + return nil, errors.New("not found") } func testRPCAndAccount(t *testing.T) (*RPCClient, *wallet.Account) { client := &RPCClient{ @@ -172,7 +182,7 @@ func TestSimpleWrappers(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(42), nf) - client.bCount = 100500 + client.bCount.Store(100500) bc, err := a.GetBlockCount() require.NoError(t, err) require.Equal(t, uint32(100500), bc) diff --git a/pkg/rpcclient/actor/maker_test.go b/pkg/rpcclient/actor/maker_test.go index b7f66c7b4..1b17510e8 100644 --- a/pkg/rpcclient/actor/maker_test.go +++ b/pkg/rpcclient/actor/maker_test.go @@ -20,7 +20,7 @@ func TestCalculateValidUntilBlock(t *testing.T) { require.Error(t, err) client.err = nil - client.bCount = 42 + client.bCount.Store(42) vub, err := a.CalculateValidUntilBlock() require.NoError(t, err) require.Equal(t, uint32(42+7+1), vub) @@ -37,7 +37,7 @@ func TestCalculateValidUntilBlock(t *testing.T) { require.NoError(t, err) require.Equal(t, uint32(42+4+1), vub) - client.bCount = 101 + client.bCount.Store(101) vub, err = a.CalculateValidUntilBlock() require.NoError(t, err) require.Equal(t, uint32(101+10+1), vub) @@ -64,7 +64,7 @@ func TestMakeUnsigned(t *testing.T) { // Good unchecked. client.netFee = 42 - client.bCount = 100500 + client.bCount.Store(100500) client.err = nil tx, err := a.MakeUnsignedUncheckedRun(script, 1, nil) require.NoError(t, err) diff --git a/pkg/rpcclient/actor/waiter_test.go b/pkg/rpcclient/actor/waiter_test.go new file mode 100644 index 000000000..03a516ce4 --- /dev/null +++ b/pkg/rpcclient/actor/waiter_test.go @@ -0,0 +1,186 @@ +package actor + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/stretchr/testify/require" +) + +type AwaitableRPCClient struct { + RPCClient + + chLock sync.RWMutex + subBlockCh chan<- rpcclient.Notification + subTxCh chan<- rpcclient.Notification +} + +func (c *AwaitableRPCClient) SubscribeForNewBlocksWithChan(primary *int, since *uint32, till *uint32, rcvrCh chan<- rpcclient.Notification) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subBlockCh = rcvrCh + return "1", nil +} +func (c *AwaitableRPCClient) SubscribeForTransactionExecutionsWithChan(state *string, container *util.Uint256, rcvrCh chan<- rpcclient.Notification) (string, error) { + c.chLock.Lock() + defer c.chLock.Unlock() + c.subTxCh = rcvrCh + return "2", nil +} +func (c *AwaitableRPCClient) Unsubscribe(id string) error { return nil } + +func TestNewWaiter(t *testing.T) { + w := newWaiter((RPCActor)(nil), nil) + _, ok := w.(NullWaiter) + require.True(t, ok) + + w = newWaiter(&RPCClient{}, &result.Version{}) + _, ok = w.(*PollingWaiter) + require.True(t, ok) + + w = newWaiter(&AwaitableRPCClient{RPCClient: RPCClient{}}, &result.Version{}) + _, ok = w.(*EventWaiter) + require.True(t, ok) +} + +func TestPollingWaiter_Wait(t *testing.T) { + h := util.Uint256{1, 2, 3} + bCount := uint32(5) + appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}} + expected := &state.AppExecResult{Container: h, Execution: state.Execution{}} + c := &RPCClient{appLog: appLog} + c.bCount.Store(bCount) + w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. + _, ok := w.(*PollingWaiter) + require.True(t, ok) + + // Wait with error. + someErr := errors.New("some error") + _, err := w.Wait(h, bCount, someErr) + require.ErrorIs(t, err, someErr) + + // AER is in chain immediately. + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + + // Missing AER after VUB. + c.appLog = nil + _, err = w.Wait(h, bCount-2, nil) + require.ErrorIs(t, ErrTxNotAccepted, err) + + checkErr := func(t *testing.T, trigger func(), target error) { + errCh := make(chan error) + go func() { + _, err = w.Wait(h, bCount, nil) + errCh <- err + }() + timer := time.NewTimer(time.Second) + var triggerFired bool + waitloop: + for { + select { + case err = <-errCh: + require.ErrorIs(t, err, target) + break waitloop + case <-timer.C: + if triggerFired { + t.Fatal("failed to await result") + } + trigger() + triggerFired = true + timer.Reset(time.Second * 2) + } + } + require.True(t, triggerFired) + } + + // Tx is accepted before VUB. + c.appLog = nil + c.bCount.Store(bCount) + checkErr(t, func() { c.bCount.Store(bCount + 1) }, ErrTxNotAccepted) + + // Context is cancelled. + c.appLog = nil + c.bCount.Store(bCount) + ctx, cancel := context.WithCancel(context.Background()) + c.context = ctx + checkErr(t, cancel, ErrContextDone) +} + +func TestWSWaiter_Wait(t *testing.T) { + h := util.Uint256{1, 2, 3} + bCount := uint32(5) + appLog := &result.ApplicationLog{Container: h, Executions: []state.Execution{{}}} + expected := &state.AppExecResult{Container: h, Execution: state.Execution{}} + c := &AwaitableRPCClient{RPCClient: RPCClient{appLog: appLog}} + c.bCount.Store(bCount) + w := newWaiter(c, &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}) // reduce testing time. + _, ok := w.(*EventWaiter) + require.True(t, ok) + + // Wait with error. + someErr := errors.New("some error") + _, err := w.Wait(h, bCount, someErr) + require.ErrorIs(t, err, someErr) + + // AER is in chain immediately. + doneCh := make(chan struct{}) + go func() { + aer, err := w.Wait(h, bCount-1, nil) + require.NoError(t, err) + require.Equal(t, expected, aer) + doneCh <- struct{}{} + }() + check := func(t *testing.T, trigger func()) { + timer := time.NewTimer(time.Second) + var triggerFired bool + waitloop: + for { + select { + case <-doneCh: + break waitloop + case <-timer.C: + if triggerFired { + t.Fatal("failed to await result") + } + trigger() + triggerFired = true + timer.Reset(time.Second * 2) + } + } + require.True(t, triggerFired) + } + check(t, func() { + c.chLock.RLock() + defer c.chLock.RUnlock() + c.subBlockCh <- rpcclient.Notification{ + Type: neorpc.ExecutionEventID, + Value: expected, + } + }) + + // Missing AER after VUB. + go func() { + _, err = w.Wait(h, bCount-2, nil) + require.ErrorIs(t, err, ErrTxNotAccepted) + doneCh <- struct{}{} + }() + check(t, func() { + c.chLock.RLock() + defer c.chLock.RUnlock() + c.subBlockCh <- rpcclient.Notification{ + Type: neorpc.BlockEventID, + Value: &block.Block{}, + } + }) +} From 6b216050f37741ae9813440f8047a6df5b592bcc Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Oct 2022 10:17:23 +0300 Subject: [PATCH 12/15] rpc: add compat tests for RPC* interfaces and Client implementations --- pkg/rpcclient/actor/compat_test.go | 19 +++++++++++++++++++ pkg/rpcclient/invoker/compat_test.go | 16 ++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 pkg/rpcclient/actor/compat_test.go create mode 100644 pkg/rpcclient/invoker/compat_test.go diff --git a/pkg/rpcclient/actor/compat_test.go b/pkg/rpcclient/actor/compat_test.go new file mode 100644 index 000000000..b8e356898 --- /dev/null +++ b/pkg/rpcclient/actor/compat_test.go @@ -0,0 +1,19 @@ +package actor_test + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" +) + +func TestRPCActorRPCClientCompat(t *testing.T) { + _ = actor.RPCActor(&rpcclient.WSClient{}) + _ = actor.RPCActor(&rpcclient.Client{}) +} + +func TestRPCWaiterRPCClientCompat(t *testing.T) { + _ = actor.RPCPollingWaiter(&rpcclient.Client{}) + _ = actor.RPCPollingWaiter(&rpcclient.WSClient{}) + _ = actor.RPCEventWaiter(&rpcclient.WSClient{}) +} diff --git a/pkg/rpcclient/invoker/compat_test.go b/pkg/rpcclient/invoker/compat_test.go new file mode 100644 index 000000000..1c875da6d --- /dev/null +++ b/pkg/rpcclient/invoker/compat_test.go @@ -0,0 +1,16 @@ +package invoker_test + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" +) + +func TestRPCInvokerRPCClientCompat(t *testing.T) { + _ = invoker.RPCInvoke(&rpcclient.Client{}) + _ = invoker.RPCInvoke(&rpcclient.WSClient{}) + _ = invoker.RPCInvokeHistoric(&rpcclient.Client{}) + _ = invoker.RPCInvokeHistoric(&rpcclient.WSClient{}) + _ = invoker.RPCSessions(&rpcclient.WSClient{}) +} From d2a9e9120d7783c903aa0e0497e90363c2e54a5d Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Oct 2022 11:35:10 +0300 Subject: [PATCH 13/15] rpc: extend Waiter interface to wait for several txs with context --- pkg/rpcclient/actor/waiter.go | 75 +++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/pkg/rpcclient/actor/waiter.go b/pkg/rpcclient/actor/waiter.go index 724f9b734..38685bc90 100644 --- a/pkg/rpcclient/actor/waiter.go +++ b/pkg/rpcclient/actor/waiter.go @@ -40,6 +40,13 @@ type ( // ValidUntilBlock value and an error. It returns transaction execution result // or an error if transaction wasn't accepted to the chain. Wait(h util.Uint256, vub uint32, err error) (*state.AppExecResult, error) + // WaitAny waits until at least one of the specified transactions will be accepted + // to the chain until vub (including). It returns execution result of this + // transaction or an error if none of the transactions was accepted to the chain. + // It uses underlying RPCPollingWaiter or RPCEventWaiter context to interrupt + // awaiting process, but additional ctx can be passed as an argument for the same + // purpose. + WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) } // RPCPollingWaiter is an interface that enables transaction awaiting functionality // for Actor instance based on periodical BlockCount and ApplicationLog polls. @@ -110,6 +117,11 @@ func (NullWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppExecRes return nil, ErrAwaitingNotSupported } +// WaitAny implements Waiter interface. +func (NullWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { + return nil, ErrAwaitingNotSupported +} + // NewPollingWaiter creates an instance of Waiter supporting poll-based transaction awaiting. func NewPollingWaiter(waiter RPCPollingWaiter) (*PollingWaiter, error) { v, err := waiter.GetVersion() @@ -127,6 +139,11 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE if err != nil { return nil, err } + return w.WaitAny(context.TODO(), vub, h) +} + +// WaitAny implements Waiter interface. +func (w *PollingWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (*state.AppExecResult, error) { var ( currentHeight uint32 failedAttempt int @@ -153,18 +170,22 @@ func (w *PollingWaiter) Wait(h util.Uint256, vub uint32, err error) (*state.AppE currentHeight = blockCount - 1 } t := trigger.Application - res, err := w.polling.GetApplicationLog(h, &t) - if err == nil { - return &state.AppExecResult{ - Container: h, - Execution: res.Executions[0], - }, nil + for _, h := range hashes { + res, err := w.polling.GetApplicationLog(h, &t) + if err == nil { + return &state.AppExecResult{ + Container: res.Container, + Execution: res.Executions[0], + }, nil + } } if currentHeight >= vub { return nil, ErrTxNotAccepted } case <-w.polling.Context().Done(): return nil, fmt.Errorf("%w: %v", ErrContextDone, w.polling.Context().Err()) + case <-ctx.Done(): + return nil, fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) } } } @@ -188,10 +209,15 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap if err != nil { return nil, err } + return w.WaitAny(context.TODO(), vub, h) +} + +// WaitAny implements Waiter interface. +func (w *EventWaiter) WaitAny(ctx context.Context, vub uint32, hashes ...util.Uint256) (res *state.AppExecResult, waitErr error) { var wsWaitErr error defer func() { if wsWaitErr != nil { - res, waitErr = w.polling.Wait(h, vub, nil) + res, waitErr = w.polling.WaitAny(ctx, vub, hashes...) if waitErr != nil { waitErr = fmt.Errorf("WS waiter error: %w, simple waiter error: %v", wsWaitErr, waitErr) } @@ -229,23 +255,25 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap waitErr = fmt.Errorf(errFmt, errArgs...) } }() - txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) - if err != nil { - wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) - return - } - defer func() { - err = w.ws.Unsubscribe(txsID) + for _, h := range hashes { + txsID, err := w.ws.SubscribeForTransactionExecutionsWithChan(nil, &h, rcvr) if err != nil { - errFmt := "failed to unsubscribe from transactions (id: %s): %v" - errArgs := []interface{}{txsID, err} - if waitErr != nil { - errFmt += "; wait error: %w" - errArgs = append(errArgs, waitErr) - } - waitErr = fmt.Errorf(errFmt, errArgs...) + wsWaitErr = fmt.Errorf("failed to subscribe for execution results: %w", err) + return } - }() + defer func() { + err = w.ws.Unsubscribe(txsID) + if err != nil { + errFmt := "failed to unsubscribe from transactions (id: %s): %v" + errArgs := []interface{}{txsID, err} + if waitErr != nil { + errFmt += "; wait error: %w" + errArgs = append(errArgs, waitErr) + } + waitErr = fmt.Errorf(errFmt, errArgs...) + } + }() + } for { select { @@ -265,6 +293,9 @@ func (w *EventWaiter) Wait(h util.Uint256, vub uint32, err error) (res *state.Ap case <-w.ws.Context().Done(): waitErr = fmt.Errorf("%w: %v", ErrContextDone, w.ws.Context().Err()) return + case <-ctx.Done(): + waitErr = fmt.Errorf("%w: %v", ErrContextDone, ctx.Err()) + return } } } From 1a6f1c805c5137c3085c01d20609360f0260fc5a Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Oct 2022 12:15:59 +0300 Subject: [PATCH 14/15] rpc: fix race in TestWSClientEvents ``` 2022-10-21T08:59:45.2219797Z === RUN TestWSClientEvents/default_ntf_channel 2022-10-21T08:59:45.2219901Z ================== 2022-10-21T08:59:45.2220017Z WARNING: DATA RACE 2022-10-21T08:59:45.2220177Z Write at 0x00c000c82778 by goroutine 371: 2022-10-21T08:59:45.2220580Z github.com/nspcc-dev/neo-go/pkg/rpcclient.TestWSClientEvents.func2() 2022-10-21T08:59:45.2221112Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient_test.go:171 +0x1c4 2022-10-21T08:59:45.2221244Z testing.tRunner() 2022-10-21T08:59:45.2221617Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1439 +0x213 2022-10-21T08:59:45.2221759Z testing.(*T).Run.func1() 2022-10-21T08:59:45.2222124Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1486 +0x47 2022-10-21T08:59:45.2222138Z 2022-10-21T08:59:45.2222308Z Previous read at 0x00c000c82778 by goroutine 37: 2022-10-21T08:59:45.2222694Z github.com/nspcc-dev/neo-go/pkg/rpcclient.(*Client).StateRootInHeader() 2022-10-21T08:59:45.2223151Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/rpc.go:1104 +0xb0 2022-10-21T08:59:45.2223482Z github.com/nspcc-dev/neo-go/pkg/rpcclient.(*WSClient).wsReader() 2022-10-21T08:59:45.2224077Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient.go:210 +0x651 2022-10-21T08:59:45.2224416Z github.com/nspcc-dev/neo-go/pkg/rpcclient.NewWS.func2() 2022-10-21T08:59:45.2224892Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient.go:149 +0x39 2022-10-21T08:59:45.2224901Z 2022-10-21T08:59:45.2225049Z Goroutine 371 (running) created at: 2022-10-21T08:59:45.2225182Z testing.(*T).Run() 2022-10-21T08:59:45.2225548Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1486 +0x724 2022-10-21T08:59:45.2225911Z github.com/nspcc-dev/neo-go/pkg/rpcclient.TestWSClientEvents() 2022-10-21T08:59:45.2226408Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient_test.go:167 +0x404 2022-10-21T08:59:45.2226539Z testing.tRunner() 2022-10-21T08:59:45.2226900Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1439 +0x213 2022-10-21T08:59:45.2227042Z testing.(*T).Run.func1() 2022-10-21T08:59:45.2227398Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1486 +0x47 2022-10-21T08:59:45.2227406Z 2022-10-21T08:59:45.2227552Z Goroutine 37 (finished) created at: 2022-10-21T08:59:45.2227851Z github.com/nspcc-dev/neo-go/pkg/rpcclient.NewWS() 2022-10-21T08:59:45.2228327Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient.go:149 +0x6fb 2022-10-21T08:59:45.2228843Z github.com/nspcc-dev/neo-go/pkg/rpcclient.TestWSClientEvents.func2() 2022-10-21T08:59:45.2229434Z /home/runner/work/neo-go/neo-go/pkg/rpcclient/wsclient_test.go:168 +0x131 2022-10-21T08:59:45.2229569Z testing.tRunner() 2022-10-21T08:59:45.2229930Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1439 +0x213 2022-10-21T08:59:45.2230069Z testing.(*T).Run.func1() 2022-10-21T08:59:45.2230424Z /opt/hostedtoolcache/go/1.18.7/x64/src/testing/testing.go:1486 +0x47 2022-10-21T08:59:45.2230526Z ================== 2022-10-21T08:59:45.2230703Z wsclient_test.go:186: 2022-10-21T08:59:45.2230988Z Error Trace: wsclient_test.go:186 2022-10-21T08:59:45.2231209Z Error: Should be true 2022-10-21T08:59:45.2231536Z Test: TestWSClientEvents/default_ntf_channel 2022-10-21T08:59:45.2231812Z testing.go:1312: race detected during execution of test ``` --- pkg/rpcclient/wsclient_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/rpcclient/wsclient_test.go b/pkg/rpcclient/wsclient_test.go index 501700b84..e0ea0fa8f 100644 --- a/pkg/rpcclient/wsclient_test.go +++ b/pkg/rpcclient/wsclient_test.go @@ -168,7 +168,10 @@ func TestWSClientEvents(t *testing.T) { wsc, err := NewWS(context.TODO(), httpURLtoWS(srv.URL), Options{}) require.NoError(t, err) wsc.getNextRequestID = getTestRequestID + wsc.cacheLock.Lock() wsc.cache.initDone = true // Our server mock is restricted, so perform initialisation manually. + wsc.cache.network = netmode.UnitTestNet + wsc.cacheLock.Unlock() // Our server mock is restricted, so perform subscriptions manually with default notifications channel. wsc.subscriptionsLock.Lock() wsc.subscriptions["0"] = notificationReceiver{typ: neorpc.BlockEventID, ch: wsc.Notifications} @@ -176,7 +179,6 @@ func TestWSClientEvents(t *testing.T) { wsc.subscriptions["2"] = notificationReceiver{typ: neorpc.NotificationEventID, ch: wsc.Notifications} // MissedEvent must be delivered without subscription. wsc.subscriptionsLock.Unlock() - wsc.cache.network = netmode.UnitTestNet for range events { select { case _, ok = <-wsc.Notifications: From 3cccc89dac786e78810ddc309e89cd339aaa3bd9 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 21 Oct 2022 20:15:20 +0300 Subject: [PATCH 15/15] rpc: add Wait wrapper to Notary actor --- pkg/rpcclient/notary/actor.go | 14 +++++++ pkg/rpcclient/notary/actor_test.go | 60 +++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/pkg/rpcclient/notary/actor.go b/pkg/rpcclient/notary/actor.go index c99a4e7af..2a71f5750 100644 --- a/pkg/rpcclient/notary/actor.go +++ b/pkg/rpcclient/notary/actor.go @@ -1,9 +1,11 @@ package notary import ( + "context" "errors" "fmt" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/neorpc/result" @@ -313,3 +315,15 @@ func (a *Actor) SendRequestExactly(mainTx *transaction.Transaction, fbTx *transa } return mainHash, fbHash, vub, nil } + +// Wait waits until main or fallback transaction will be accepted to the chain and returns +// the resulting application execution result or actor.ErrTxNotAccepted if both transactions +// failed to persist. Wait can be used if underlying Actor supports transaction awaiting, +// see actor.Actor and actor.Waiter documentation for details. Wait may be used as a wrapper +// for Notarize, SendRequest or SendRequestExactly. +func (a *Actor) Wait(mainHash, fbHash util.Uint256, vub uint32, err error) (*state.AppExecResult, error) { + if err != nil { + return nil, err + } + return a.WaitAny(context.TODO(), vub, mainHash, fbHash) +} diff --git a/pkg/rpcclient/notary/actor_test.go b/pkg/rpcclient/notary/actor_test.go index 84aed3e17..16fc7ce4c 100644 --- a/pkg/rpcclient/notary/actor_test.go +++ b/pkg/rpcclient/notary/actor_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/nspcc-dev/neo-go/pkg/config/netmode" + "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" @@ -19,6 +20,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "github.com/nspcc-dev/neo-go/pkg/vm/vmstate" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/stretchr/testify/require" ) @@ -32,6 +34,7 @@ type RPCClient struct { hash util.Uint256 nhash util.Uint256 mirror bool + applog *result.ApplicationLog } func (r *RPCClient) InvokeContractVerify(contract util.Uint160, params []smartcontract.Parameter, signers []transaction.Signer, witnesses ...transaction.Witness) (*result.Invoke, error) { @@ -69,12 +72,14 @@ func (r *RPCClient) TraverseIterator(sessionID, iteratorID uuid.UUID, maxItemsCo return nil, nil // Just a stub, unused by actor. } func (r *RPCClient) Context() context.Context { - panic("TODO") + return context.Background() } func (r *RPCClient) GetApplicationLog(hash util.Uint256, trig *trigger.Type) (*result.ApplicationLog, error) { - panic("TODO") + return r.applog, nil } +var _ = actor.RPCPollingWaiter(&RPCClient{}) + func TestNewActor(t *testing.T) { rc := &RPCClient{ version: &result.Version{ @@ -528,3 +533,54 @@ func TestDefaultActorOptions(t *testing.T) { require.NoError(t, opts.MainCheckerModifier(&result.Invoke{State: "HALT"}, tx)) require.Equal(t, uint32(42), tx.ValidUntilBlock) } + +func TestWait(t *testing.T) { + rc := &RPCClient{version: &result.Version{Protocol: result.Protocol{MillisecondsPerBlock: 1}}} + + key0, err := keys.NewPrivateKey() + require.NoError(t, err) + key1, err := keys.NewPrivateKey() + require.NoError(t, err) + + acc0 := wallet.NewAccountFromPrivateKey(key0) + facc1 := FakeSimpleAccount(key1.PublicKey()) + + act, err := NewActor(rc, []actor.SignerAccount{{ + Signer: transaction.Signer{ + Account: acc0.Contract.ScriptHash(), + Scopes: transaction.None, + }, + Account: acc0, + }, { + Signer: transaction.Signer{ + Account: facc1.Contract.ScriptHash(), + Scopes: transaction.CalledByEntry, + }, + Account: facc1, + }}, acc0) + require.NoError(t, err) + + someErr := errors.New("someErr") + _, err = act.Wait(util.Uint256{}, util.Uint256{}, 0, someErr) + require.ErrorIs(t, err, someErr) + + cont := util.Uint256{1, 2, 3} + ex := state.Execution{ + Trigger: trigger.Application, + VMState: vmstate.Halt, + GasConsumed: 123, + Stack: []stackitem.Item{stackitem.Null{}}, + } + applog := &result.ApplicationLog{ + Container: cont, + IsTransaction: true, + Executions: []state.Execution{ex}, + } + rc.applog = applog + res, err := act.Wait(util.Uint256{}, util.Uint256{}, 0, nil) + require.NoError(t, err) + require.Equal(t, &state.AppExecResult{ + Container: cont, + Execution: ex, + }, res) +}