From b8e96ac82bd8d5f7f802947955f739dad600891b Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 28 May 2021 14:47:33 +0300 Subject: [PATCH 1/4] core: move mempool.Event to a separate package And write a marshaller for EventType, it'll be reused by the Notification subsystem. --- pkg/core/mempool/mem_pool.go | 29 +++++------ pkg/core/mempool/subscriptions.go | 27 ++-------- pkg/core/mempool/subscriptions_test.go | 29 +++++------ pkg/core/mempoolevent/event.go | 70 ++++++++++++++++++++++++++ pkg/services/notary/notary.go | 9 ++-- 5 files changed, 109 insertions(+), 55 deletions(-) create mode 100644 pkg/core/mempoolevent/event.go diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 53b6f60bb..47a9a8723 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" @@ -75,9 +76,9 @@ type Pool struct { subscriptionsEnabled bool subscriptionsOn atomic.Bool stopCh chan struct{} - events chan Event - subCh chan chan<- Event // there are no other events in mempool except Event, so no need in generic subscribers type - unsubCh chan chan<- Event + events chan mempoolevent.Event + subCh chan chan<- mempoolevent.Event // there are no other events in mempool except Event, so no need in generic subscribers type + unsubCh chan chan<- mempoolevent.Event } func (p items) Len() int { return len(p) } @@ -259,8 +260,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e } mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: unlucky.txn, Data: unlucky.data, } @@ -287,8 +288,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e mp.lock.Unlock() if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionAdded, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionAdded, Tx: pItem.txn, Data: pItem.data, } @@ -332,8 +333,8 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) { delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) } if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: itm.txn, Data: itm.data, } @@ -382,8 +383,8 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool, feer Feer) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) } if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: itm.txn, Data: itm.data, } @@ -428,9 +429,9 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool { oracleResp: make(map[uint64]util.Uint256), subscriptionsEnabled: enableSubscriptions, stopCh: make(chan struct{}), - events: make(chan Event), - subCh: make(chan chan<- Event), - unsubCh: make(chan chan<- Event), + events: make(chan mempoolevent.Event), + subCh: make(chan chan<- mempoolevent.Event), + unsubCh: make(chan chan<- mempoolevent.Event), } mp.subscriptionsOn.Store(false) return mp diff --git a/pkg/core/mempool/subscriptions.go b/pkg/core/mempool/subscriptions.go index 9e1f667a9..066d0651b 100644 --- a/pkg/core/mempool/subscriptions.go +++ b/pkg/core/mempool/subscriptions.go @@ -1,25 +1,6 @@ package mempool -import ( - "github.com/nspcc-dev/neo-go/pkg/core/transaction" -) - -// EventType represents mempool event type. -type EventType byte - -const ( - // TransactionAdded marks transaction addition mempool event. - TransactionAdded EventType = 0x01 - // TransactionRemoved marks transaction removal mempool event. - TransactionRemoved EventType = 0x02 -) - -// Event represents one of mempool events: transaction was added or removed from mempool. -type Event struct { - Type EventType - Tx *transaction.Transaction - Data interface{} -} +import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // You should manually free the resources by calling StopSubscriptions on mempool shutdown. @@ -47,7 +28,7 @@ func (mp *Pool) StopSubscriptions() { // SubscribeForTransactions adds given channel to new mempool event broadcasting, so when // there is a new transactions added to mempool or an existing transaction removed from // mempool you'll receive it via this channel. -func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { +func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) { if mp.subscriptionsOn.Load() { mp.subCh <- ch } @@ -55,7 +36,7 @@ func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { // UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications, // you can close it afterwards. Passing non-subscribed channel is a no-op. -func (mp *Pool) UnsubscribeFromTransactions(ch chan<- Event) { +func (mp *Pool) UnsubscribeFromTransactions(ch chan<- mempoolevent.Event) { if mp.subscriptionsOn.Load() { mp.unsubCh <- ch } @@ -67,7 +48,7 @@ func (mp *Pool) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - txFeed = make(map[chan<- Event]bool) + txFeed = make(map[chan<- mempoolevent.Event]bool) ) for { select { diff --git a/pkg/core/mempool/subscriptions_test.go b/pkg/core/mempool/subscriptions_test.go index a2fb3a91f..bbe1bc2a3 100644 --- a/pkg/core/mempool/subscriptions_test.go +++ b/pkg/core/mempool/subscriptions_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" @@ -25,8 +26,8 @@ func TestSubscriptions(t *testing.T) { fs := &FeerStub{balance: 100} mp := New(2, 0, true) mp.RunSubscriptions() - subChan1 := make(chan Event, 3) - subChan2 := make(chan Event, 3) + subChan1 := make(chan mempoolevent.Event, 3) + subChan2 := make(chan mempoolevent.Event, 3) mp.SubscribeForTransactions(subChan1) t.Cleanup(mp.StopSubscriptions) @@ -42,7 +43,7 @@ func TestSubscriptions(t *testing.T) { require.NoError(t, mp.Add(txs[0], fs)) require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100) event := <-subChan1 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[0]}, event) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event) // severak subscribers mp.SubscribeForTransactions(subChan2) @@ -50,28 +51,28 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 := <-subChan1 event2 := <-subChan2 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event1) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event2) // reach capacity require.NoError(t, mp.Add(txs[2], &FeerStub{})) require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event2) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event1) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event2) // remove tx mp.Remove(txs[1].Hash(), fs) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event2) // remove stale mp.RemoveStale(func(tx *transaction.Transaction) bool { @@ -80,8 +81,8 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event2) // unsubscribe mp.UnsubscribeFromTransactions(subChan1) @@ -89,6 +90,6 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100) event2 = <-subChan2 require.Equal(t, 0, len(subChan1)) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[3]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[3]}, event2) }) } diff --git a/pkg/core/mempoolevent/event.go b/pkg/core/mempoolevent/event.go new file mode 100644 index 000000000..5361c85a6 --- /dev/null +++ b/pkg/core/mempoolevent/event.go @@ -0,0 +1,70 @@ +package mempoolevent + +import ( + "encoding/json" + "errors" + + "github.com/nspcc-dev/neo-go/pkg/core/transaction" +) + +// Type represents mempool event type. +type Type byte + +const ( + // TransactionAdded marks transaction addition mempool event. + TransactionAdded Type = 0x01 + // TransactionRemoved marks transaction removal mempool event. + TransactionRemoved Type = 0x02 +) + +// Event represents one of mempool events: transaction was added or removed from mempool. +type Event struct { + Type Type + Tx *transaction.Transaction + Data interface{} +} + +// String is a Stringer implementation. +func (e Type) String() string { + switch e { + case TransactionAdded: + return "added" + case TransactionRemoved: + return "removed" + default: + return "unknown" + } +} + +// GetEventTypeFromString converts input string into an Type if it's possible. +func GetEventTypeFromString(s string) (Type, error) { + switch s { + case "added": + return TransactionAdded, nil + case "removed": + return TransactionRemoved, nil + default: + return 0, errors.New("invalid event type name") + } +} + +// MarshalJSON implements json.Marshaler interface. +func (e Type) MarshalJSON() ([]byte, error) { + return json.Marshal(e.String()) +} + +// UnmarshalJSON implements json.Unmarshaler interface. +func (e *Type) UnmarshalJSON(b []byte) error { + var s string + + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + id, err := GetEventTypeFromString(s) + if err != nil { + return err + } + *e = id + return nil +} diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index a2f5d7eb5..8ba2f2486 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -48,7 +49,7 @@ type ( mp *mempool.Pool // requests channel - reqCh chan mempool.Event + reqCh chan mempoolevent.Event blocksCh chan *block.Block stopCh chan struct{} } @@ -109,7 +110,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu wallet: wallet, onTransaction: onTransaction, mp: mp, - reqCh: make(chan mempool.Event), + reqCh: make(chan mempoolevent.Event), blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), }, nil @@ -129,9 +130,9 @@ func (n *Notary) Run() { case event := <-n.reqCh: if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { switch event.Type { - case mempool.TransactionAdded: + case mempoolevent.TransactionAdded: n.OnNewRequest(req) - case mempool.TransactionRemoved: + case mempoolevent.TransactionRemoved: n.OnRequestRemoval(req) } } From 133b600c8ccde5eaa4516ef522f3f3f35a2c7af7 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Mon, 31 May 2021 16:02:24 +0300 Subject: [PATCH 2/4] config: set MinPeers=0 for unit testchain Some RPC tests require network services to be enabled and running (i.e. notary pool subscriptions). Services can be launched either when node has reached synchronised state or when it does not have connected peers and MinPeers=0. The second one is the case of RPC tests. --- config/protocol.unit_testnet.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/protocol.unit_testnet.yml b/config/protocol.unit_testnet.yml index 7644ef3f1..2f4b1e1d0 100644 --- a/config/protocol.unit_testnet.yml +++ b/config/protocol.unit_testnet.yml @@ -56,7 +56,7 @@ ApplicationConfiguration: PingTimeout: 90 MaxPeers: 50 AttemptConnPeers: 5 - MinPeers: 1 + MinPeers: 0 P2PNotary: Enabled: false UnlockWallet: From 1dbf1d4310d65228f5c5ec46e7a92838f0e9a5f7 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 28 May 2021 14:55:06 +0300 Subject: [PATCH 3/4] rpc: allow to track notary requests via Notification subsystem --- pkg/network/payload/notary_request.go | 4 +- pkg/network/server.go | 33 +++++++- pkg/rpc/client/wsclient.go | 14 ++++ pkg/rpc/response/events.go | 15 ++++ pkg/rpc/server/server.go | 60 ++++++++++---- pkg/rpc/server/server_test.go | 81 ++++++++++--------- pkg/rpc/server/subscription.go | 15 ++++ pkg/rpc/server/subscription_test.go | 109 +++++++++++++++++++++++++- 8 files changed, 276 insertions(+), 55 deletions(-) diff --git a/pkg/network/payload/notary_request.go b/pkg/network/payload/notary_request.go index e78178901..0d2d67b5c 100644 --- a/pkg/network/payload/notary_request.go +++ b/pkg/network/payload/notary_request.go @@ -13,8 +13,8 @@ import ( // P2PNotaryRequest contains main and fallback transactions for the Notary service. type P2PNotaryRequest struct { - MainTransaction *transaction.Transaction - FallbackTransaction *transaction.Transaction + MainTransaction *transaction.Transaction `json:"maintx"` + FallbackTransaction *transaction.Transaction `json:"fallbacktx"` Witness transaction.Witness diff --git a/pkg/network/server.go b/pkg/network/server.go index 560829d5d..3f02e1213 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" @@ -143,7 +144,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) - s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, config.P2PNotaryCfg.Enabled) + s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, true) chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { return bc.IsTxStillRelevant(t, txpool, true) @@ -295,6 +296,8 @@ func (s *Server) Shutdown() { } if s.notaryModule != nil { s.notaryModule.Stop() + } + if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) @@ -449,13 +452,39 @@ func (s *Server) tryStartServices() { if s.oracle != nil { go s.oracle.Run() } + if s.chain.P2PSigExtensionsEnabled() { + s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. + } if s.notaryModule != nil { - s.notaryRequestPool.RunSubscriptions() go s.notaryModule.Run() } } } +// SubscribeForNotaryRequests adds given channel to a notary request event +// broadcasting, so when a new P2PNotaryRequest is received or an existing +// P2PNotaryRequest is removed from pool you'll receive it via this channel. +// Make sure it's read from regularly as not reading these events might affect +// other Server functions. +// Ensure that P2PSigExtensions are enabled before calling this method. +func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) { + if !s.chain.P2PSigExtensionsEnabled() { + panic("P2PSigExtensions are disabled") + } + s.notaryRequestPool.SubscribeForTransactions(ch) +} + +// UnsubscribeFromNotaryRequests unsubscribes given channel from notary request +// notifications, you can close it afterwards. Passing non-subscribed channel +// is a no-op. +// Ensure that P2PSigExtensions are enabled before calling this method. +func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) { + if !s.chain.P2PSigExtensionsEnabled() { + panic("P2PSigExtensions are disabled") + } + s.notaryRequestPool.UnsubscribeFromTransactions(ch) +} + // Peers returns the current list of peers connected to // the server. func (s *Server) Peers() map[Peer]bool { diff --git a/pkg/rpc/client/wsclient.go b/pkg/rpc/client/wsclient.go index e0b9c7a52..b45a35bc9 100644 --- a/pkg/rpc/client/wsclient.go +++ b/pkg/rpc/client/wsclient.go @@ -149,6 +149,8 @@ readloop: val = new(state.NotificationEvent) case response.ExecutionEventID: val = new(state.AppExecResult) + case response.NotaryRequestEventID: + val = new(response.NotaryRequestEvent) case response.MissedEventID: // No value. default: @@ -300,6 +302,18 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err return c.performSubscription(params) } +// SubscribeForNotaryRequests adds subscription for notary request payloads +// addition or removal events to this instance of client. It can be filtered by +// request sender's hash, or main tx signer's hash, nil value puts no such +// restrictions. +func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { + params := request.NewRawParams("notary_request_event") + if sender != nil { + params.Values = append(params.Values, request.TxFilter{Sender: sender, Signer: mainSigner}) + } + return c.performSubscription(params) +} + // Unsubscribe removes subscription for given event stream. func (c *WSClient) Unsubscribe(id string) error { return c.performUnsubscription(id) diff --git a/pkg/rpc/response/events.go b/pkg/rpc/response/events.go index 32d1e11ee..0c997ea42 100644 --- a/pkg/rpc/response/events.go +++ b/pkg/rpc/response/events.go @@ -3,11 +3,20 @@ package response import ( "encoding/json" "errors" + + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" + "github.com/nspcc-dev/neo-go/pkg/network/payload" ) type ( // EventID represents an event type happening on the chain. EventID byte + // NotaryRequestEvent represents P2PNotaryRequest event either added or removed + // from notary payload pool. + NotaryRequestEvent struct { + Type mempoolevent.Type `json:"type"` + NotaryRequest *payload.P2PNotaryRequest `json:"notaryrequest"` + } ) const ( @@ -22,6 +31,8 @@ const ( NotificationEventID // ExecutionEventID is used for `transaction_executed` events. ExecutionEventID + // NotaryRequestEventID is used for `notary_request_event` event. + NotaryRequestEventID // MissedEventID notifies user of missed events. MissedEventID EventID = 255 ) @@ -37,6 +48,8 @@ func (e EventID) String() string { return "notification_from_execution" case ExecutionEventID: return "transaction_executed" + case NotaryRequestEventID: + return "notary_request_event" case MissedEventID: return "event_missed" default: @@ -55,6 +68,8 @@ func GetEventIDFromString(s string) (EventID, error) { return NotificationEventID, nil case "transaction_executed": return ExecutionEventID, nil + case "notary_request_event": + return NotaryRequestEventID, nil case "event_missed": return MissedEventID, nil default: diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 3a62b3e6d..b67201320 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/fee" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -57,16 +58,18 @@ type ( https *http.Server shutdown chan struct{} - subsLock sync.RWMutex - subscribers map[*subscriber]bool - blockSubs int - executionSubs int - notificationSubs int - transactionSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.NotificationEvent - transactionCh chan *transaction.Transaction + subsLock sync.RWMutex + subscribers map[*subscriber]bool + blockSubs int + executionSubs int + notificationSubs int + transactionSubs int + notaryRequestSubs int + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.NotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event } ) @@ -174,10 +177,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. - blockCh: make(chan *block.Block), - executionCh: make(chan *state.AppExecResult), - notificationCh: make(chan *state.NotificationEvent), - transactionCh: make(chan *transaction.Transaction), + blockCh: make(chan *block.Block), + executionCh: make(chan *state.AppExecResult), + notificationCh: make(chan *state.NotificationEvent), + transactionCh: make(chan *transaction.Transaction), + notaryRequestCh: make(chan mempoolevent.Event), } } @@ -1448,6 +1452,9 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface if err != nil || event == response.MissedEventID { return nil, response.ErrInvalidParams } + if event == response.NotaryRequestEventID && !s.chain.P2PSigExtensionsEnabled() { + return nil, response.WrapErrorWithData(response.ErrInvalidParams, errors.New("P2PSigExtensions are disabled")) + } // Optional filter. var filter interface{} if p := reqParams.Value(1); p != nil { @@ -1468,6 +1475,10 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface if p.Type != request.ExecutionFilterT { return nil, response.ErrInvalidParams } + case response.NotaryRequestEventID: + if p.Type != request.TxFilterT { + return nil, response.ErrInvalidParams + } } filter = p.Value } @@ -1519,6 +1530,11 @@ func (s *Server) subscribeToChannel(event response.EventID) { s.chain.SubscribeForExecutions(s.executionCh) } s.executionSubs++ + case response.NotaryRequestEventID: + if s.notaryRequestSubs == 0 { + s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) + } + s.notaryRequestSubs++ } } @@ -1565,6 +1581,11 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) { if s.executionSubs == 0 { s.chain.UnsubscribeFromExecutions(s.executionCh) } + case response.NotaryRequestEventID: + s.notaryRequestSubs-- + if s.notaryRequestSubs == 0 { + s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) + } } } @@ -1605,6 +1626,12 @@ chloop: case tx := <-s.transactionCh: resp.Event = response.TransactionEventID resp.Payload[0] = tx + case e := <-s.notaryRequestCh: + resp.Event = response.NotaryRequestEventID + resp.Payload[0] = &response.NotaryRequestEvent{ + Type: e.Type, + NotaryRequest: e.Data.(*payload.P2PNotaryRequest), + } } s.subsLock.RLock() subloop: @@ -1657,6 +1684,9 @@ chloop: s.chain.UnsubscribeFromTransactions(s.transactionCh) s.chain.UnsubscribeFromNotifications(s.notificationCh) s.chain.UnsubscribeFromExecutions(s.executionCh) + if s.chain.P2PSigExtensionsEnabled() { + s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) + } s.subsLock.Unlock() drainloop: for { @@ -1665,6 +1695,7 @@ drainloop: case <-s.executionCh: case <-s.notificationCh: case <-s.transactionCh: + case <-s.notaryRequestCh: default: break drainloop } @@ -1675,6 +1706,7 @@ drainloop: close(s.transactionCh) close(s.notificationCh) close(s.executionCh) + close(s.notaryRequestCh) } func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Error) { diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 454733a1a..e78c858aa 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1107,42 +1107,7 @@ func TestSubmitNotaryRequest(t *testing.T) { }) t.Run("valid request", func(t *testing.T) { sender := testchain.PrivateKeyByID(0) // owner of the deposit in testchain - mainTx := &transaction.Transaction{ - Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}}, - Script: []byte{byte(opcode.RET)}, - ValidUntilBlock: 123, - Signers: []transaction.Signer{{Account: util.Uint160{1, 5, 9}}}, - Scripts: []transaction.Witness{{ - InvocationScript: []byte{1, 4, 7}, - VerificationScript: []byte{3, 6, 9}, - }}, - } - fallbackTx := &transaction.Transaction{ - Script: []byte{byte(opcode.RET)}, - ValidUntilBlock: 123, - Attributes: []transaction.Attribute{ - {Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: 123}}, - {Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}}, - {Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}}, - }, - Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}}, - Scripts: []transaction.Witness{ - {InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}}, - }, - NetworkFee: 2_0000_0000, - } - fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{ - InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...), - VerificationScript: sender.PublicKey().GetVerificationScript(), - }) - p := &payload.P2PNotaryRequest{ - MainTransaction: mainTx, - FallbackTransaction: fallbackTx, - } - p.Witness = transaction.Witness{ - InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...), - VerificationScript: sender.PublicKey().GetVerificationScript(), - } + p := createValidNotaryRequest(chain, sender, 1) bytes, err := p.Bytes() require.NoError(t, err) str := fmt.Sprintf(`"%s"`, base64.StdEncoding.EncodeToString(bytes)) @@ -1150,6 +1115,50 @@ func TestSubmitNotaryRequest(t *testing.T) { }) } +// createValidNotaryRequest creates and signs P2PNotaryRequest payload which can +// pass verification. +func createValidNotaryRequest(chain *core.Blockchain, sender *keys.PrivateKey, nonce uint32) *payload.P2PNotaryRequest { + h := chain.BlockHeight() + mainTx := &transaction.Transaction{ + Nonce: nonce, + Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}}, + Script: []byte{byte(opcode.RET)}, + ValidUntilBlock: h + 100, + Signers: []transaction.Signer{{Account: sender.GetScriptHash()}}, + Scripts: []transaction.Witness{{ + InvocationScript: []byte{1, 4, 7}, + VerificationScript: []byte{3, 6, 9}, + }}, + } + fallbackTx := &transaction.Transaction{ + Script: []byte{byte(opcode.RET)}, + ValidUntilBlock: h + 100, + Attributes: []transaction.Attribute{ + {Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: h + 50}}, + {Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}}, + {Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}}, + }, + Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}}, + Scripts: []transaction.Witness{ + {InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}}, + }, + NetworkFee: 2_0000_0000, + } + fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{ + InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...), + VerificationScript: sender.PublicKey().GetVerificationScript(), + }) + p := &payload.P2PNotaryRequest{ + MainTransaction: mainTx, + FallbackTransaction: fallbackTx, + } + p.Witness = transaction.Witness{ + InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...), + VerificationScript: sender.PublicKey().GetVerificationScript(), + } + return p +} + // testRPCProtocol runs a full set of tests using given callback to make actual // calls. Some tests change the chain state, thus we reinitialize the chain from // scratch here. diff --git a/pkg/rpc/server/subscription.go b/pkg/rpc/server/subscription.go index af06839cb..062706280 100644 --- a/pkg/rpc/server/subscription.go +++ b/pkg/rpc/server/subscription.go @@ -80,6 +80,21 @@ func (f *feed) Matches(r *response.Notification) bool { filt := f.filter.(request.ExecutionFilter) applog := r.Payload[0].(*state.AppExecResult) return applog.VMState.String() == filt.State + case response.NotaryRequestEventID: + filt := f.filter.(request.TxFilter) + req := r.Payload[0].(*response.NotaryRequestEvent) + senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender + signerOK := true + if filt.Signer != nil { + signerOK = false + for _, signer := range req.NotaryRequest.MainTransaction.Signers { + if signer.Account.Equals(*filt.Signer) { + signerOK = true + break + } + } + } + return senderOk && signerOK } return false } diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index e2ca27ce4..a2c1bdef2 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -86,10 +86,12 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) - var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"} + var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"} chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + go rpcSrv.coreServer.Start(make(chan error)) + defer rpcSrv.coreServer.Shutdown() defer chain.Close() defer func() { _ = rpcSrv.Shutdown() }() @@ -133,6 +135,17 @@ func TestSubscriptions(t *testing.T) { require.Equal(t, response.BlockEventID, resp.Event) } + // We should manually add NotaryRequest to test notification. + sender := testchain.PrivateKeyByID(0) + err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, sender, 1)) + require.NoError(t, err) + for { + resp := getNotification(t, respMsgs) + if resp.Event == response.NotaryRequestEventID { + break + } + } + for _, id := range subIDs { callUnsubscribe(t, c, respMsgs, id) } @@ -277,6 +290,100 @@ func TestFilteredSubscriptions(t *testing.T) { } } +func TestFilteredNotaryRequestSubscriptions(t *testing.T) { + // We can't fit this into TestFilteredSubscriptions, because notary requests + // event doesn't depend on blocks events. + priv0 := testchain.PrivateKeyByID(0) + var goodSender = priv0.GetScriptHash() + + var cases = map[string]struct { + params string + check func(*testing.T, *response.Notification) + }{ + "matching sender": { + params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + fbTx := req["fallbacktx"].(map[string]interface{}) + sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), sender) + }, + }, + "matching signer": { + params: `["notary_request_event", {"signer":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + mainTx := req["maintx"].(map[string]interface{}) + signers := mainTx["signers"].([]interface{}) + signer0 := signers[0].(map[string]interface{}) + signer0acc := signer0["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), signer0acc) + }, + }, + "matching sender and signer": { + params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + mainTx := req["maintx"].(map[string]interface{}) + fbTx := req["fallbacktx"].(map[string]interface{}) + sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), sender) + signers := mainTx["signers"].([]interface{}) + signer0 := signers[0].(map[string]interface{}) + signer0acc := signer0["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), signer0acc) + }, + }, + } + + chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + go rpcSrv.coreServer.Start(make(chan error, 1)) + + defer chain.Close() + defer func() { _ = rpcSrv.Shutdown() }() + + // blocks are needed to make GAS deposit for priv0 + blocks := getTestBlocks(t) + for _, b := range blocks { + require.NoError(t, chain.AddBlock(b)) + } + + var nonce uint32 = 100 + for name, this := range cases { + t.Run(name, func(t *testing.T) { + subID := callSubscribe(t, c, respMsgs, this.params) + + err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, priv0, nonce)) + require.NoError(t, err) + nonce++ + + var resp = new(response.Notification) + select { + case body := <-respMsgs: + require.NoError(t, json.Unmarshal(body, resp)) + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + + require.Equal(t, response.NotaryRequestEventID, resp.Event) + this.check(t, resp) + + callUnsubscribe(t, c, respMsgs, subID) + }) + } + finishedFlag.CAS(false, true) + c.Close() +} + func TestFilteredBlockSubscriptions(t *testing.T) { // We can't fit this into TestFilteredSubscriptions, because it uses // blocks as EOF events to wait for. From b14fc5da7c89ba4d38b8ed7941e29e7e49c2bb70 Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Tue, 1 Jun 2021 15:22:27 +0300 Subject: [PATCH 4/4] vm: increase waiting time for vm cli program --- pkg/vm/cli/cli_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/vm/cli/cli_test.go b/pkg/vm/cli/cli_test.go index 9c28b8ce6..999858647 100644 --- a/pkg/vm/cli/cli_test.go +++ b/pkg/vm/cli/cli_test.go @@ -87,7 +87,7 @@ func (e *executor) runProg(t *testing.T, commands ...string) { }() select { case <-e.ch: - case <-time.After(time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "command took too long time") } }