diff --git a/config/protocol.unit_testnet.yml b/config/protocol.unit_testnet.yml index 7644ef3f1..2f4b1e1d0 100644 --- a/config/protocol.unit_testnet.yml +++ b/config/protocol.unit_testnet.yml @@ -56,7 +56,7 @@ ApplicationConfiguration: PingTimeout: 90 MaxPeers: 50 AttemptConnPeers: 5 - MinPeers: 1 + MinPeers: 0 P2PNotary: Enabled: false UnlockWallet: diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 53b6f60bb..47a9a8723 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" @@ -75,9 +76,9 @@ type Pool struct { subscriptionsEnabled bool subscriptionsOn atomic.Bool stopCh chan struct{} - events chan Event - subCh chan chan<- Event // there are no other events in mempool except Event, so no need in generic subscribers type - unsubCh chan chan<- Event + events chan mempoolevent.Event + subCh chan chan<- mempoolevent.Event // there are no other events in mempool except Event, so no need in generic subscribers type + unsubCh chan chan<- mempoolevent.Event } func (p items) Len() int { return len(p) } @@ -259,8 +260,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e } mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: unlucky.txn, Data: unlucky.data, } @@ -287,8 +288,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e mp.lock.Unlock() if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionAdded, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionAdded, Tx: pItem.txn, Data: pItem.data, } @@ -332,8 +333,8 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) { delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) } if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: itm.txn, Data: itm.data, } @@ -382,8 +383,8 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool, feer Feer) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) } if mp.subscriptionsOn.Load() { - mp.events <- Event{ - Type: TransactionRemoved, + mp.events <- mempoolevent.Event{ + Type: mempoolevent.TransactionRemoved, Tx: itm.txn, Data: itm.data, } @@ -428,9 +429,9 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool { oracleResp: make(map[uint64]util.Uint256), subscriptionsEnabled: enableSubscriptions, stopCh: make(chan struct{}), - events: make(chan Event), - subCh: make(chan chan<- Event), - unsubCh: make(chan chan<- Event), + events: make(chan mempoolevent.Event), + subCh: make(chan chan<- mempoolevent.Event), + unsubCh: make(chan chan<- mempoolevent.Event), } mp.subscriptionsOn.Store(false) return mp diff --git a/pkg/core/mempool/subscriptions.go b/pkg/core/mempool/subscriptions.go index 9e1f667a9..066d0651b 100644 --- a/pkg/core/mempool/subscriptions.go +++ b/pkg/core/mempool/subscriptions.go @@ -1,25 +1,6 @@ package mempool -import ( - "github.com/nspcc-dev/neo-go/pkg/core/transaction" -) - -// EventType represents mempool event type. -type EventType byte - -const ( - // TransactionAdded marks transaction addition mempool event. - TransactionAdded EventType = 0x01 - // TransactionRemoved marks transaction removal mempool event. - TransactionRemoved EventType = 0x02 -) - -// Event represents one of mempool events: transaction was added or removed from mempool. -type Event struct { - Type EventType - Tx *transaction.Transaction - Data interface{} -} +import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // You should manually free the resources by calling StopSubscriptions on mempool shutdown. @@ -47,7 +28,7 @@ func (mp *Pool) StopSubscriptions() { // SubscribeForTransactions adds given channel to new mempool event broadcasting, so when // there is a new transactions added to mempool or an existing transaction removed from // mempool you'll receive it via this channel. -func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { +func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) { if mp.subscriptionsOn.Load() { mp.subCh <- ch } @@ -55,7 +36,7 @@ func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { // UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications, // you can close it afterwards. Passing non-subscribed channel is a no-op. -func (mp *Pool) UnsubscribeFromTransactions(ch chan<- Event) { +func (mp *Pool) UnsubscribeFromTransactions(ch chan<- mempoolevent.Event) { if mp.subscriptionsOn.Load() { mp.unsubCh <- ch } @@ -67,7 +48,7 @@ func (mp *Pool) notificationDispatcher() { // These are just sets of subscribers, though modelled as maps // for ease of management (not a lot of subscriptions is really // expected, but maps are convenient for adding/deleting elements). - txFeed = make(map[chan<- Event]bool) + txFeed = make(map[chan<- mempoolevent.Event]bool) ) for { select { diff --git a/pkg/core/mempool/subscriptions_test.go b/pkg/core/mempool/subscriptions_test.go index a2fb3a91f..bbe1bc2a3 100644 --- a/pkg/core/mempool/subscriptions_test.go +++ b/pkg/core/mempool/subscriptions_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" @@ -25,8 +26,8 @@ func TestSubscriptions(t *testing.T) { fs := &FeerStub{balance: 100} mp := New(2, 0, true) mp.RunSubscriptions() - subChan1 := make(chan Event, 3) - subChan2 := make(chan Event, 3) + subChan1 := make(chan mempoolevent.Event, 3) + subChan2 := make(chan mempoolevent.Event, 3) mp.SubscribeForTransactions(subChan1) t.Cleanup(mp.StopSubscriptions) @@ -42,7 +43,7 @@ func TestSubscriptions(t *testing.T) { require.NoError(t, mp.Add(txs[0], fs)) require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100) event := <-subChan1 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[0]}, event) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event) // severak subscribers mp.SubscribeForTransactions(subChan2) @@ -50,28 +51,28 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 := <-subChan1 event2 := <-subChan2 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event1) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event2) // reach capacity require.NoError(t, mp.Add(txs[2], &FeerStub{})) require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event2) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event1) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event2) // remove tx mp.Remove(txs[1].Hash(), fs) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event2) // remove stale mp.RemoveStale(func(tx *transaction.Transaction) bool { @@ -80,8 +81,8 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 = <-subChan1 event2 = <-subChan2 - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event1) - require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event1) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event2) // unsubscribe mp.UnsubscribeFromTransactions(subChan1) @@ -89,6 +90,6 @@ func TestSubscriptions(t *testing.T) { require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100) event2 = <-subChan2 require.Equal(t, 0, len(subChan1)) - require.Equal(t, Event{Type: TransactionAdded, Tx: txs[3]}, event2) + require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[3]}, event2) }) } diff --git a/pkg/core/mempoolevent/event.go b/pkg/core/mempoolevent/event.go new file mode 100644 index 000000000..5361c85a6 --- /dev/null +++ b/pkg/core/mempoolevent/event.go @@ -0,0 +1,70 @@ +package mempoolevent + +import ( + "encoding/json" + "errors" + + "github.com/nspcc-dev/neo-go/pkg/core/transaction" +) + +// Type represents mempool event type. +type Type byte + +const ( + // TransactionAdded marks transaction addition mempool event. + TransactionAdded Type = 0x01 + // TransactionRemoved marks transaction removal mempool event. + TransactionRemoved Type = 0x02 +) + +// Event represents one of mempool events: transaction was added or removed from mempool. +type Event struct { + Type Type + Tx *transaction.Transaction + Data interface{} +} + +// String is a Stringer implementation. +func (e Type) String() string { + switch e { + case TransactionAdded: + return "added" + case TransactionRemoved: + return "removed" + default: + return "unknown" + } +} + +// GetEventTypeFromString converts input string into an Type if it's possible. +func GetEventTypeFromString(s string) (Type, error) { + switch s { + case "added": + return TransactionAdded, nil + case "removed": + return TransactionRemoved, nil + default: + return 0, errors.New("invalid event type name") + } +} + +// MarshalJSON implements json.Marshaler interface. +func (e Type) MarshalJSON() ([]byte, error) { + return json.Marshal(e.String()) +} + +// UnmarshalJSON implements json.Unmarshaler interface. +func (e *Type) UnmarshalJSON(b []byte) error { + var s string + + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + id, err := GetEventTypeFromString(s) + if err != nil { + return err + } + *e = id + return nil +} diff --git a/pkg/network/payload/notary_request.go b/pkg/network/payload/notary_request.go index e78178901..0d2d67b5c 100644 --- a/pkg/network/payload/notary_request.go +++ b/pkg/network/payload/notary_request.go @@ -13,8 +13,8 @@ import ( // P2PNotaryRequest contains main and fallback transactions for the Notary service. type P2PNotaryRequest struct { - MainTransaction *transaction.Transaction - FallbackTransaction *transaction.Transaction + MainTransaction *transaction.Transaction `json:"maintx"` + FallbackTransaction *transaction.Transaction `json:"fallbacktx"` Witness transaction.Witness diff --git a/pkg/network/server.go b/pkg/network/server.go index 560829d5d..3f02e1213 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" @@ -143,7 +144,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai } if chain.P2PSigExtensionsEnabled() { s.notaryFeer = NewNotaryFeer(chain) - s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, config.P2PNotaryCfg.Enabled) + s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, true) chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { return bc.IsTxStillRelevant(t, txpool, true) @@ -295,6 +296,8 @@ func (s *Server) Shutdown() { } if s.notaryModule != nil { s.notaryModule.Stop() + } + if s.chain.P2PSigExtensionsEnabled() { s.notaryRequestPool.StopSubscriptions() } close(s.quit) @@ -449,13 +452,39 @@ func (s *Server) tryStartServices() { if s.oracle != nil { go s.oracle.Run() } + if s.chain.P2PSigExtensionsEnabled() { + s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. + } if s.notaryModule != nil { - s.notaryRequestPool.RunSubscriptions() go s.notaryModule.Run() } } } +// SubscribeForNotaryRequests adds given channel to a notary request event +// broadcasting, so when a new P2PNotaryRequest is received or an existing +// P2PNotaryRequest is removed from pool you'll receive it via this channel. +// Make sure it's read from regularly as not reading these events might affect +// other Server functions. +// Ensure that P2PSigExtensions are enabled before calling this method. +func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) { + if !s.chain.P2PSigExtensionsEnabled() { + panic("P2PSigExtensions are disabled") + } + s.notaryRequestPool.SubscribeForTransactions(ch) +} + +// UnsubscribeFromNotaryRequests unsubscribes given channel from notary request +// notifications, you can close it afterwards. Passing non-subscribed channel +// is a no-op. +// Ensure that P2PSigExtensions are enabled before calling this method. +func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) { + if !s.chain.P2PSigExtensionsEnabled() { + panic("P2PSigExtensions are disabled") + } + s.notaryRequestPool.UnsubscribeFromTransactions(ch) +} + // Peers returns the current list of peers connected to // the server. func (s *Server) Peers() map[Peer]bool { diff --git a/pkg/rpc/client/wsclient.go b/pkg/rpc/client/wsclient.go index e0b9c7a52..b45a35bc9 100644 --- a/pkg/rpc/client/wsclient.go +++ b/pkg/rpc/client/wsclient.go @@ -149,6 +149,8 @@ readloop: val = new(state.NotificationEvent) case response.ExecutionEventID: val = new(state.AppExecResult) + case response.NotaryRequestEventID: + val = new(response.NotaryRequestEvent) case response.MissedEventID: // No value. default: @@ -300,6 +302,18 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err return c.performSubscription(params) } +// SubscribeForNotaryRequests adds subscription for notary request payloads +// addition or removal events to this instance of client. It can be filtered by +// request sender's hash, or main tx signer's hash, nil value puts no such +// restrictions. +func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) { + params := request.NewRawParams("notary_request_event") + if sender != nil { + params.Values = append(params.Values, request.TxFilter{Sender: sender, Signer: mainSigner}) + } + return c.performSubscription(params) +} + // Unsubscribe removes subscription for given event stream. func (c *WSClient) Unsubscribe(id string) error { return c.performUnsubscription(id) diff --git a/pkg/rpc/response/events.go b/pkg/rpc/response/events.go index 32d1e11ee..0c997ea42 100644 --- a/pkg/rpc/response/events.go +++ b/pkg/rpc/response/events.go @@ -3,11 +3,20 @@ package response import ( "encoding/json" "errors" + + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" + "github.com/nspcc-dev/neo-go/pkg/network/payload" ) type ( // EventID represents an event type happening on the chain. EventID byte + // NotaryRequestEvent represents P2PNotaryRequest event either added or removed + // from notary payload pool. + NotaryRequestEvent struct { + Type mempoolevent.Type `json:"type"` + NotaryRequest *payload.P2PNotaryRequest `json:"notaryrequest"` + } ) const ( @@ -22,6 +31,8 @@ const ( NotificationEventID // ExecutionEventID is used for `transaction_executed` events. ExecutionEventID + // NotaryRequestEventID is used for `notary_request_event` event. + NotaryRequestEventID // MissedEventID notifies user of missed events. MissedEventID EventID = 255 ) @@ -37,6 +48,8 @@ func (e EventID) String() string { return "notification_from_execution" case ExecutionEventID: return "transaction_executed" + case NotaryRequestEventID: + return "notary_request_event" case MissedEventID: return "event_missed" default: @@ -55,6 +68,8 @@ func GetEventIDFromString(s string) (EventID, error) { return NotificationEventID, nil case "transaction_executed": return ExecutionEventID, nil + case "notary_request_event": + return NotaryRequestEventID, nil case "event_missed": return MissedEventID, nil default: diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index 3a62b3e6d..b67201320 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/fee" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/transaction" @@ -57,16 +58,18 @@ type ( https *http.Server shutdown chan struct{} - subsLock sync.RWMutex - subscribers map[*subscriber]bool - blockSubs int - executionSubs int - notificationSubs int - transactionSubs int - blockCh chan *block.Block - executionCh chan *state.AppExecResult - notificationCh chan *state.NotificationEvent - transactionCh chan *transaction.Transaction + subsLock sync.RWMutex + subscribers map[*subscriber]bool + blockSubs int + executionSubs int + notificationSubs int + transactionSubs int + notaryRequestSubs int + blockCh chan *block.Block + executionCh chan *state.AppExecResult + notificationCh chan *state.NotificationEvent + transactionCh chan *transaction.Transaction + notaryRequestCh chan mempoolevent.Event } ) @@ -174,10 +177,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S subscribers: make(map[*subscriber]bool), // These are NOT buffered to preserve original order of events. - blockCh: make(chan *block.Block), - executionCh: make(chan *state.AppExecResult), - notificationCh: make(chan *state.NotificationEvent), - transactionCh: make(chan *transaction.Transaction), + blockCh: make(chan *block.Block), + executionCh: make(chan *state.AppExecResult), + notificationCh: make(chan *state.NotificationEvent), + transactionCh: make(chan *transaction.Transaction), + notaryRequestCh: make(chan mempoolevent.Event), } } @@ -1448,6 +1452,9 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface if err != nil || event == response.MissedEventID { return nil, response.ErrInvalidParams } + if event == response.NotaryRequestEventID && !s.chain.P2PSigExtensionsEnabled() { + return nil, response.WrapErrorWithData(response.ErrInvalidParams, errors.New("P2PSigExtensions are disabled")) + } // Optional filter. var filter interface{} if p := reqParams.Value(1); p != nil { @@ -1468,6 +1475,10 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface if p.Type != request.ExecutionFilterT { return nil, response.ErrInvalidParams } + case response.NotaryRequestEventID: + if p.Type != request.TxFilterT { + return nil, response.ErrInvalidParams + } } filter = p.Value } @@ -1519,6 +1530,11 @@ func (s *Server) subscribeToChannel(event response.EventID) { s.chain.SubscribeForExecutions(s.executionCh) } s.executionSubs++ + case response.NotaryRequestEventID: + if s.notaryRequestSubs == 0 { + s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) + } + s.notaryRequestSubs++ } } @@ -1565,6 +1581,11 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) { if s.executionSubs == 0 { s.chain.UnsubscribeFromExecutions(s.executionCh) } + case response.NotaryRequestEventID: + s.notaryRequestSubs-- + if s.notaryRequestSubs == 0 { + s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) + } } } @@ -1605,6 +1626,12 @@ chloop: case tx := <-s.transactionCh: resp.Event = response.TransactionEventID resp.Payload[0] = tx + case e := <-s.notaryRequestCh: + resp.Event = response.NotaryRequestEventID + resp.Payload[0] = &response.NotaryRequestEvent{ + Type: e.Type, + NotaryRequest: e.Data.(*payload.P2PNotaryRequest), + } } s.subsLock.RLock() subloop: @@ -1657,6 +1684,9 @@ chloop: s.chain.UnsubscribeFromTransactions(s.transactionCh) s.chain.UnsubscribeFromNotifications(s.notificationCh) s.chain.UnsubscribeFromExecutions(s.executionCh) + if s.chain.P2PSigExtensionsEnabled() { + s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh) + } s.subsLock.Unlock() drainloop: for { @@ -1665,6 +1695,7 @@ drainloop: case <-s.executionCh: case <-s.notificationCh: case <-s.transactionCh: + case <-s.notaryRequestCh: default: break drainloop } @@ -1675,6 +1706,7 @@ drainloop: close(s.transactionCh) close(s.notificationCh) close(s.executionCh) + close(s.notaryRequestCh) } func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Error) { diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 454733a1a..e78c858aa 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -1107,42 +1107,7 @@ func TestSubmitNotaryRequest(t *testing.T) { }) t.Run("valid request", func(t *testing.T) { sender := testchain.PrivateKeyByID(0) // owner of the deposit in testchain - mainTx := &transaction.Transaction{ - Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}}, - Script: []byte{byte(opcode.RET)}, - ValidUntilBlock: 123, - Signers: []transaction.Signer{{Account: util.Uint160{1, 5, 9}}}, - Scripts: []transaction.Witness{{ - InvocationScript: []byte{1, 4, 7}, - VerificationScript: []byte{3, 6, 9}, - }}, - } - fallbackTx := &transaction.Transaction{ - Script: []byte{byte(opcode.RET)}, - ValidUntilBlock: 123, - Attributes: []transaction.Attribute{ - {Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: 123}}, - {Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}}, - {Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}}, - }, - Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}}, - Scripts: []transaction.Witness{ - {InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}}, - }, - NetworkFee: 2_0000_0000, - } - fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{ - InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...), - VerificationScript: sender.PublicKey().GetVerificationScript(), - }) - p := &payload.P2PNotaryRequest{ - MainTransaction: mainTx, - FallbackTransaction: fallbackTx, - } - p.Witness = transaction.Witness{ - InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...), - VerificationScript: sender.PublicKey().GetVerificationScript(), - } + p := createValidNotaryRequest(chain, sender, 1) bytes, err := p.Bytes() require.NoError(t, err) str := fmt.Sprintf(`"%s"`, base64.StdEncoding.EncodeToString(bytes)) @@ -1150,6 +1115,50 @@ func TestSubmitNotaryRequest(t *testing.T) { }) } +// createValidNotaryRequest creates and signs P2PNotaryRequest payload which can +// pass verification. +func createValidNotaryRequest(chain *core.Blockchain, sender *keys.PrivateKey, nonce uint32) *payload.P2PNotaryRequest { + h := chain.BlockHeight() + mainTx := &transaction.Transaction{ + Nonce: nonce, + Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}}, + Script: []byte{byte(opcode.RET)}, + ValidUntilBlock: h + 100, + Signers: []transaction.Signer{{Account: sender.GetScriptHash()}}, + Scripts: []transaction.Witness{{ + InvocationScript: []byte{1, 4, 7}, + VerificationScript: []byte{3, 6, 9}, + }}, + } + fallbackTx := &transaction.Transaction{ + Script: []byte{byte(opcode.RET)}, + ValidUntilBlock: h + 100, + Attributes: []transaction.Attribute{ + {Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: h + 50}}, + {Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}}, + {Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}}, + }, + Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}}, + Scripts: []transaction.Witness{ + {InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}}, + }, + NetworkFee: 2_0000_0000, + } + fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{ + InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...), + VerificationScript: sender.PublicKey().GetVerificationScript(), + }) + p := &payload.P2PNotaryRequest{ + MainTransaction: mainTx, + FallbackTransaction: fallbackTx, + } + p.Witness = transaction.Witness{ + InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...), + VerificationScript: sender.PublicKey().GetVerificationScript(), + } + return p +} + // testRPCProtocol runs a full set of tests using given callback to make actual // calls. Some tests change the chain state, thus we reinitialize the chain from // scratch here. diff --git a/pkg/rpc/server/subscription.go b/pkg/rpc/server/subscription.go index af06839cb..062706280 100644 --- a/pkg/rpc/server/subscription.go +++ b/pkg/rpc/server/subscription.go @@ -80,6 +80,21 @@ func (f *feed) Matches(r *response.Notification) bool { filt := f.filter.(request.ExecutionFilter) applog := r.Payload[0].(*state.AppExecResult) return applog.VMState.String() == filt.State + case response.NotaryRequestEventID: + filt := f.filter.(request.TxFilter) + req := r.Payload[0].(*response.NotaryRequestEvent) + senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender + signerOK := true + if filt.Signer != nil { + signerOK = false + for _, signer := range req.NotaryRequest.MainTransaction.Signers { + if signer.Account.Equals(*filt.Signer) { + signerOK = true + break + } + } + } + return senderOk && signerOK } return false } diff --git a/pkg/rpc/server/subscription_test.go b/pkg/rpc/server/subscription_test.go index e2ca27ce4..a2c1bdef2 100644 --- a/pkg/rpc/server/subscription_test.go +++ b/pkg/rpc/server/subscription_test.go @@ -86,10 +86,12 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st func TestSubscriptions(t *testing.T) { var subIDs = make([]string, 0) - var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"} + var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"} chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + go rpcSrv.coreServer.Start(make(chan error)) + defer rpcSrv.coreServer.Shutdown() defer chain.Close() defer func() { _ = rpcSrv.Shutdown() }() @@ -133,6 +135,17 @@ func TestSubscriptions(t *testing.T) { require.Equal(t, response.BlockEventID, resp.Event) } + // We should manually add NotaryRequest to test notification. + sender := testchain.PrivateKeyByID(0) + err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, sender, 1)) + require.NoError(t, err) + for { + resp := getNotification(t, respMsgs) + if resp.Event == response.NotaryRequestEventID { + break + } + } + for _, id := range subIDs { callUnsubscribe(t, c, respMsgs, id) } @@ -277,6 +290,100 @@ func TestFilteredSubscriptions(t *testing.T) { } } +func TestFilteredNotaryRequestSubscriptions(t *testing.T) { + // We can't fit this into TestFilteredSubscriptions, because notary requests + // event doesn't depend on blocks events. + priv0 := testchain.PrivateKeyByID(0) + var goodSender = priv0.GetScriptHash() + + var cases = map[string]struct { + params string + check func(*testing.T, *response.Notification) + }{ + "matching sender": { + params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + fbTx := req["fallbacktx"].(map[string]interface{}) + sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), sender) + }, + }, + "matching signer": { + params: `["notary_request_event", {"signer":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + mainTx := req["maintx"].(map[string]interface{}) + signers := mainTx["signers"].([]interface{}) + signer0 := signers[0].(map[string]interface{}) + signer0acc := signer0["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), signer0acc) + }, + }, + "matching sender and signer": { + params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `"}]`, + check: func(t *testing.T, resp *response.Notification) { + rmap := resp.Payload[0].(map[string]interface{}) + require.Equal(t, response.NotaryRequestEventID, resp.Event) + require.Equal(t, "added", rmap["type"].(string)) + req := rmap["notaryrequest"].(map[string]interface{}) + mainTx := req["maintx"].(map[string]interface{}) + fbTx := req["fallbacktx"].(map[string]interface{}) + sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), sender) + signers := mainTx["signers"].([]interface{}) + signer0 := signers[0].(map[string]interface{}) + signer0acc := signer0["account"].(string) + require.Equal(t, "0x"+goodSender.StringLE(), signer0acc) + }, + }, + } + + chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) + go rpcSrv.coreServer.Start(make(chan error, 1)) + + defer chain.Close() + defer func() { _ = rpcSrv.Shutdown() }() + + // blocks are needed to make GAS deposit for priv0 + blocks := getTestBlocks(t) + for _, b := range blocks { + require.NoError(t, chain.AddBlock(b)) + } + + var nonce uint32 = 100 + for name, this := range cases { + t.Run(name, func(t *testing.T) { + subID := callSubscribe(t, c, respMsgs, this.params) + + err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, priv0, nonce)) + require.NoError(t, err) + nonce++ + + var resp = new(response.Notification) + select { + case body := <-respMsgs: + require.NoError(t, json.Unmarshal(body, resp)) + case <-time.After(time.Second): + t.Fatal("timeout waiting for event") + } + + require.Equal(t, response.NotaryRequestEventID, resp.Event) + this.check(t, resp) + + callUnsubscribe(t, c, respMsgs, subID) + }) + } + finishedFlag.CAS(false, true) + c.Close() +} + func TestFilteredBlockSubscriptions(t *testing.T) { // We can't fit this into TestFilteredSubscriptions, because it uses // blocks as EOF events to wait for. diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index a2f5d7eb5..8ba2f2486 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/mempool" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -48,7 +49,7 @@ type ( mp *mempool.Pool // requests channel - reqCh chan mempool.Event + reqCh chan mempoolevent.Event blocksCh chan *block.Block stopCh chan struct{} } @@ -109,7 +110,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu wallet: wallet, onTransaction: onTransaction, mp: mp, - reqCh: make(chan mempool.Event), + reqCh: make(chan mempoolevent.Event), blocksCh: make(chan *block.Block), stopCh: make(chan struct{}), }, nil @@ -129,9 +130,9 @@ func (n *Notary) Run() { case event := <-n.reqCh: if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { switch event.Type { - case mempool.TransactionAdded: + case mempoolevent.TransactionAdded: n.OnNewRequest(req) - case mempool.TransactionRemoved: + case mempoolevent.TransactionRemoved: n.OnRequestRemoval(req) } } diff --git a/pkg/vm/cli/cli_test.go b/pkg/vm/cli/cli_test.go index 9c28b8ce6..999858647 100644 --- a/pkg/vm/cli/cli_test.go +++ b/pkg/vm/cli/cli_test.go @@ -87,7 +87,7 @@ func (e *executor) runProg(t *testing.T, commands ...string) { }() select { case <-e.ch: - case <-time.After(time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "command took too long time") } }