Merge pull request #1984 from nspcc-dev/notifications/notary

rpc: support NotaryRequest notifications for Notifications subsystem
This commit is contained in:
Roman Khimov 2021-06-02 23:31:55 +03:00 committed by GitHub
commit d4495ad7b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 387 additions and 112 deletions

View file

@ -56,7 +56,7 @@ ApplicationConfiguration:
PingTimeout: 90 PingTimeout: 90
MaxPeers: 50 MaxPeers: 50
AttemptConnPeers: 5 AttemptConnPeers: 5
MinPeers: 1 MinPeers: 0
P2PNotary: P2PNotary:
Enabled: false Enabled: false
UnlockWallet: UnlockWallet:

View file

@ -8,6 +8,7 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -75,9 +76,9 @@ type Pool struct {
subscriptionsEnabled bool subscriptionsEnabled bool
subscriptionsOn atomic.Bool subscriptionsOn atomic.Bool
stopCh chan struct{} stopCh chan struct{}
events chan Event events chan mempoolevent.Event
subCh chan chan<- Event // there are no other events in mempool except Event, so no need in generic subscribers type subCh chan chan<- mempoolevent.Event // there are no other events in mempool except Event, so no need in generic subscribers type
unsubCh chan chan<- Event unsubCh chan chan<- mempoolevent.Event
} }
func (p items) Len() int { return len(p) } func (p items) Len() int { return len(p) }
@ -259,8 +260,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e
} }
mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: unlucky.txn, Tx: unlucky.txn,
Data: unlucky.data, Data: unlucky.data,
} }
@ -287,8 +288,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e
mp.lock.Unlock() mp.lock.Unlock()
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionAdded, Type: mempoolevent.TransactionAdded,
Tx: pItem.txn, Tx: pItem.txn,
Data: pItem.data, Data: pItem.data,
} }
@ -332,8 +333,8 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) {
delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID)
} }
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: itm.txn, Tx: itm.txn,
Data: itm.data, Data: itm.data,
} }
@ -382,8 +383,8 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool, feer Feer)
delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID)
} }
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: itm.txn, Tx: itm.txn,
Data: itm.data, Data: itm.data,
} }
@ -428,9 +429,9 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
oracleResp: make(map[uint64]util.Uint256), oracleResp: make(map[uint64]util.Uint256),
subscriptionsEnabled: enableSubscriptions, subscriptionsEnabled: enableSubscriptions,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
events: make(chan Event), events: make(chan mempoolevent.Event),
subCh: make(chan chan<- Event), subCh: make(chan chan<- mempoolevent.Event),
unsubCh: make(chan chan<- Event), unsubCh: make(chan chan<- mempoolevent.Event),
} }
mp.subscriptionsOn.Store(false) mp.subscriptionsOn.Store(false)
return mp return mp

View file

@ -1,25 +1,6 @@
package mempool package mempool
import ( import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
)
// EventType represents mempool event type.
type EventType byte
const (
// TransactionAdded marks transaction addition mempool event.
TransactionAdded EventType = 0x01
// TransactionRemoved marks transaction removal mempool event.
TransactionRemoved EventType = 0x02
)
// Event represents one of mempool events: transaction was added or removed from mempool.
type Event struct {
Type EventType
Tx *transaction.Transaction
Data interface{}
}
// RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled.
// You should manually free the resources by calling StopSubscriptions on mempool shutdown. // You should manually free the resources by calling StopSubscriptions on mempool shutdown.
@ -47,7 +28,7 @@ func (mp *Pool) StopSubscriptions() {
// SubscribeForTransactions adds given channel to new mempool event broadcasting, so when // SubscribeForTransactions adds given channel to new mempool event broadcasting, so when
// there is a new transactions added to mempool or an existing transaction removed from // there is a new transactions added to mempool or an existing transaction removed from
// mempool you'll receive it via this channel. // mempool you'll receive it via this channel.
func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.subCh <- ch mp.subCh <- ch
} }
@ -55,7 +36,7 @@ func (mp *Pool) SubscribeForTransactions(ch chan<- Event) {
// UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications, // UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op. // you can close it afterwards. Passing non-subscribed channel is a no-op.
func (mp *Pool) UnsubscribeFromTransactions(ch chan<- Event) { func (mp *Pool) UnsubscribeFromTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.unsubCh <- ch mp.unsubCh <- ch
} }
@ -67,7 +48,7 @@ func (mp *Pool) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps // These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really // for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements). // expected, but maps are convenient for adding/deleting elements).
txFeed = make(map[chan<- Event]bool) txFeed = make(map[chan<- mempoolevent.Event]bool)
) )
for { for {
select { select {

View file

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
@ -25,8 +26,8 @@ func TestSubscriptions(t *testing.T) {
fs := &FeerStub{balance: 100} fs := &FeerStub{balance: 100}
mp := New(2, 0, true) mp := New(2, 0, true)
mp.RunSubscriptions() mp.RunSubscriptions()
subChan1 := make(chan Event, 3) subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan Event, 3) subChan2 := make(chan mempoolevent.Event, 3)
mp.SubscribeForTransactions(subChan1) mp.SubscribeForTransactions(subChan1)
t.Cleanup(mp.StopSubscriptions) t.Cleanup(mp.StopSubscriptions)
@ -42,7 +43,7 @@ func TestSubscriptions(t *testing.T) {
require.NoError(t, mp.Add(txs[0], fs)) require.NoError(t, mp.Add(txs[0], fs))
require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100)
event := <-subChan1 event := <-subChan1
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[0]}, event) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event)
// severak subscribers // severak subscribers
mp.SubscribeForTransactions(subChan2) mp.SubscribeForTransactions(subChan2)
@ -50,28 +51,28 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 := <-subChan1 event1 := <-subChan1
event2 := <-subChan2 event2 := <-subChan2
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event1)
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event2)
// reach capacity // reach capacity
require.NoError(t, mp.Add(txs[2], &FeerStub{})) require.NoError(t, mp.Add(txs[2], &FeerStub{}))
require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event2)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event1)
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event2)
// remove tx // remove tx
mp.Remove(txs[1].Hash(), fs) mp.Remove(txs[1].Hash(), fs)
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event2)
// remove stale // remove stale
mp.RemoveStale(func(tx *transaction.Transaction) bool { mp.RemoveStale(func(tx *transaction.Transaction) bool {
@ -80,8 +81,8 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event2)
// unsubscribe // unsubscribe
mp.UnsubscribeFromTransactions(subChan1) mp.UnsubscribeFromTransactions(subChan1)
@ -89,6 +90,6 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, 0, len(subChan1)) require.Equal(t, 0, len(subChan1))
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[3]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[3]}, event2)
}) })
} }

View file

@ -0,0 +1,70 @@
package mempoolevent
import (
"encoding/json"
"errors"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
)
// Type represents mempool event type.
type Type byte
const (
// TransactionAdded marks transaction addition mempool event.
TransactionAdded Type = 0x01
// TransactionRemoved marks transaction removal mempool event.
TransactionRemoved Type = 0x02
)
// Event represents one of mempool events: transaction was added or removed from mempool.
type Event struct {
Type Type
Tx *transaction.Transaction
Data interface{}
}
// String is a Stringer implementation.
func (e Type) String() string {
switch e {
case TransactionAdded:
return "added"
case TransactionRemoved:
return "removed"
default:
return "unknown"
}
}
// GetEventTypeFromString converts input string into an Type if it's possible.
func GetEventTypeFromString(s string) (Type, error) {
switch s {
case "added":
return TransactionAdded, nil
case "removed":
return TransactionRemoved, nil
default:
return 0, errors.New("invalid event type name")
}
}
// MarshalJSON implements json.Marshaler interface.
func (e Type) MarshalJSON() ([]byte, error) {
return json.Marshal(e.String())
}
// UnmarshalJSON implements json.Unmarshaler interface.
func (e *Type) UnmarshalJSON(b []byte) error {
var s string
err := json.Unmarshal(b, &s)
if err != nil {
return err
}
id, err := GetEventTypeFromString(s)
if err != nil {
return err
}
*e = id
return nil
}

View file

@ -13,8 +13,8 @@ import (
// P2PNotaryRequest contains main and fallback transactions for the Notary service. // P2PNotaryRequest contains main and fallback transactions for the Notary service.
type P2PNotaryRequest struct { type P2PNotaryRequest struct {
MainTransaction *transaction.Transaction MainTransaction *transaction.Transaction `json:"maintx"`
FallbackTransaction *transaction.Transaction FallbackTransaction *transaction.Transaction `json:"fallbacktx"`
Witness transaction.Witness Witness transaction.Witness

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/extpool"
@ -143,7 +144,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
} }
if chain.P2PSigExtensionsEnabled() { if chain.P2PSigExtensionsEnabled() {
s.notaryFeer = NewNotaryFeer(chain) s.notaryFeer = NewNotaryFeer(chain)
s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, config.P2PNotaryCfg.Enabled) s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1, true)
chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) { chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) {
s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool { s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool {
return bc.IsTxStillRelevant(t, txpool, true) return bc.IsTxStillRelevant(t, txpool, true)
@ -295,6 +296,8 @@ func (s *Server) Shutdown() {
} }
if s.notaryModule != nil { if s.notaryModule != nil {
s.notaryModule.Stop() s.notaryModule.Stop()
}
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.StopSubscriptions() s.notaryRequestPool.StopSubscriptions()
} }
close(s.quit) close(s.quit)
@ -449,13 +452,39 @@ func (s *Server) tryStartServices() {
if s.oracle != nil { if s.oracle != nil {
go s.oracle.Run() go s.oracle.Run()
} }
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber.
}
if s.notaryModule != nil { if s.notaryModule != nil {
s.notaryRequestPool.RunSubscriptions()
go s.notaryModule.Run() go s.notaryModule.Run()
} }
} }
} }
// SubscribeForNotaryRequests adds given channel to a notary request event
// broadcasting, so when a new P2PNotaryRequest is received or an existing
// P2PNotaryRequest is removed from pool you'll receive it via this channel.
// Make sure it's read from regularly as not reading these events might affect
// other Server functions.
// Ensure that P2PSigExtensions are enabled before calling this method.
func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) {
if !s.chain.P2PSigExtensionsEnabled() {
panic("P2PSigExtensions are disabled")
}
s.notaryRequestPool.SubscribeForTransactions(ch)
}
// UnsubscribeFromNotaryRequests unsubscribes given channel from notary request
// notifications, you can close it afterwards. Passing non-subscribed channel
// is a no-op.
// Ensure that P2PSigExtensions are enabled before calling this method.
func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) {
if !s.chain.P2PSigExtensionsEnabled() {
panic("P2PSigExtensions are disabled")
}
s.notaryRequestPool.UnsubscribeFromTransactions(ch)
}
// Peers returns the current list of peers connected to // Peers returns the current list of peers connected to
// the server. // the server.
func (s *Server) Peers() map[Peer]bool { func (s *Server) Peers() map[Peer]bool {

View file

@ -149,6 +149,8 @@ readloop:
val = new(state.NotificationEvent) val = new(state.NotificationEvent)
case response.ExecutionEventID: case response.ExecutionEventID:
val = new(state.AppExecResult) val = new(state.AppExecResult)
case response.NotaryRequestEventID:
val = new(response.NotaryRequestEvent)
case response.MissedEventID: case response.MissedEventID:
// No value. // No value.
default: default:
@ -300,6 +302,18 @@ func (c *WSClient) SubscribeForTransactionExecutions(state *string) (string, err
return c.performSubscription(params) return c.performSubscription(params)
} }
// SubscribeForNotaryRequests adds subscription for notary request payloads
// addition or removal events to this instance of client. It can be filtered by
// request sender's hash, or main tx signer's hash, nil value puts no such
// restrictions.
func (c *WSClient) SubscribeForNotaryRequests(sender *util.Uint160, mainSigner *util.Uint160) (string, error) {
params := request.NewRawParams("notary_request_event")
if sender != nil {
params.Values = append(params.Values, request.TxFilter{Sender: sender, Signer: mainSigner})
}
return c.performSubscription(params)
}
// Unsubscribe removes subscription for given event stream. // Unsubscribe removes subscription for given event stream.
func (c *WSClient) Unsubscribe(id string) error { func (c *WSClient) Unsubscribe(id string) error {
return c.performUnsubscription(id) return c.performUnsubscription(id)

View file

@ -3,11 +3,20 @@ package response
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
) )
type ( type (
// EventID represents an event type happening on the chain. // EventID represents an event type happening on the chain.
EventID byte EventID byte
// NotaryRequestEvent represents P2PNotaryRequest event either added or removed
// from notary payload pool.
NotaryRequestEvent struct {
Type mempoolevent.Type `json:"type"`
NotaryRequest *payload.P2PNotaryRequest `json:"notaryrequest"`
}
) )
const ( const (
@ -22,6 +31,8 @@ const (
NotificationEventID NotificationEventID
// ExecutionEventID is used for `transaction_executed` events. // ExecutionEventID is used for `transaction_executed` events.
ExecutionEventID ExecutionEventID
// NotaryRequestEventID is used for `notary_request_event` event.
NotaryRequestEventID
// MissedEventID notifies user of missed events. // MissedEventID notifies user of missed events.
MissedEventID EventID = 255 MissedEventID EventID = 255
) )
@ -37,6 +48,8 @@ func (e EventID) String() string {
return "notification_from_execution" return "notification_from_execution"
case ExecutionEventID: case ExecutionEventID:
return "transaction_executed" return "transaction_executed"
case NotaryRequestEventID:
return "notary_request_event"
case MissedEventID: case MissedEventID:
return "event_missed" return "event_missed"
default: default:
@ -55,6 +68,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return NotificationEventID, nil return NotificationEventID, nil
case "transaction_executed": case "transaction_executed":
return ExecutionEventID, nil return ExecutionEventID, nil
case "notary_request_event":
return NotaryRequestEventID, nil
case "event_missed": case "event_missed":
return MissedEventID, nil return MissedEventID, nil
default: default:

View file

@ -21,6 +21,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/fee" "github.com/nspcc-dev/neo-go/pkg/core/fee"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -57,16 +58,18 @@ type (
https *http.Server https *http.Server
shutdown chan struct{} shutdown chan struct{}
subsLock sync.RWMutex subsLock sync.RWMutex
subscribers map[*subscriber]bool subscribers map[*subscriber]bool
blockSubs int blockSubs int
executionSubs int executionSubs int
notificationSubs int notificationSubs int
transactionSubs int transactionSubs int
blockCh chan *block.Block notaryRequestSubs int
executionCh chan *state.AppExecResult blockCh chan *block.Block
notificationCh chan *state.NotificationEvent executionCh chan *state.AppExecResult
transactionCh chan *transaction.Transaction notificationCh chan *state.NotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
} }
) )
@ -174,10 +177,11 @@ func New(chain blockchainer.Blockchainer, conf rpc.Config, coreServer *network.S
subscribers: make(map[*subscriber]bool), subscribers: make(map[*subscriber]bool),
// These are NOT buffered to preserve original order of events. // These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block), blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult), executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *state.NotificationEvent), notificationCh: make(chan *state.NotificationEvent),
transactionCh: make(chan *transaction.Transaction), transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
} }
} }
@ -1448,6 +1452,9 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
if err != nil || event == response.MissedEventID { if err != nil || event == response.MissedEventID {
return nil, response.ErrInvalidParams return nil, response.ErrInvalidParams
} }
if event == response.NotaryRequestEventID && !s.chain.P2PSigExtensionsEnabled() {
return nil, response.WrapErrorWithData(response.ErrInvalidParams, errors.New("P2PSigExtensions are disabled"))
}
// Optional filter. // Optional filter.
var filter interface{} var filter interface{}
if p := reqParams.Value(1); p != nil { if p := reqParams.Value(1); p != nil {
@ -1468,6 +1475,10 @@ func (s *Server) subscribe(reqParams request.Params, sub *subscriber) (interface
if p.Type != request.ExecutionFilterT { if p.Type != request.ExecutionFilterT {
return nil, response.ErrInvalidParams return nil, response.ErrInvalidParams
} }
case response.NotaryRequestEventID:
if p.Type != request.TxFilterT {
return nil, response.ErrInvalidParams
}
} }
filter = p.Value filter = p.Value
} }
@ -1519,6 +1530,11 @@ func (s *Server) subscribeToChannel(event response.EventID) {
s.chain.SubscribeForExecutions(s.executionCh) s.chain.SubscribeForExecutions(s.executionCh)
} }
s.executionSubs++ s.executionSubs++
case response.NotaryRequestEventID:
if s.notaryRequestSubs == 0 {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
}
s.notaryRequestSubs++
} }
} }
@ -1565,6 +1581,11 @@ func (s *Server) unsubscribeFromChannel(event response.EventID) {
if s.executionSubs == 0 { if s.executionSubs == 0 {
s.chain.UnsubscribeFromExecutions(s.executionCh) s.chain.UnsubscribeFromExecutions(s.executionCh)
} }
case response.NotaryRequestEventID:
s.notaryRequestSubs--
if s.notaryRequestSubs == 0 {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
} }
} }
@ -1605,6 +1626,12 @@ chloop:
case tx := <-s.transactionCh: case tx := <-s.transactionCh:
resp.Event = response.TransactionEventID resp.Event = response.TransactionEventID
resp.Payload[0] = tx resp.Payload[0] = tx
case e := <-s.notaryRequestCh:
resp.Event = response.NotaryRequestEventID
resp.Payload[0] = &response.NotaryRequestEvent{
Type: e.Type,
NotaryRequest: e.Data.(*payload.P2PNotaryRequest),
}
} }
s.subsLock.RLock() s.subsLock.RLock()
subloop: subloop:
@ -1657,6 +1684,9 @@ chloop:
s.chain.UnsubscribeFromTransactions(s.transactionCh) s.chain.UnsubscribeFromTransactions(s.transactionCh)
s.chain.UnsubscribeFromNotifications(s.notificationCh) s.chain.UnsubscribeFromNotifications(s.notificationCh)
s.chain.UnsubscribeFromExecutions(s.executionCh) s.chain.UnsubscribeFromExecutions(s.executionCh)
if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
s.subsLock.Unlock() s.subsLock.Unlock()
drainloop: drainloop:
for { for {
@ -1665,6 +1695,7 @@ drainloop:
case <-s.executionCh: case <-s.executionCh:
case <-s.notificationCh: case <-s.notificationCh:
case <-s.transactionCh: case <-s.transactionCh:
case <-s.notaryRequestCh:
default: default:
break drainloop break drainloop
} }
@ -1675,6 +1706,7 @@ drainloop:
close(s.transactionCh) close(s.transactionCh)
close(s.notificationCh) close(s.notificationCh)
close(s.executionCh) close(s.executionCh)
close(s.notaryRequestCh)
} }
func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Error) { func (s *Server) blockHeightFromParam(param *request.Param) (int, *response.Error) {

View file

@ -1107,42 +1107,7 @@ func TestSubmitNotaryRequest(t *testing.T) {
}) })
t.Run("valid request", func(t *testing.T) { t.Run("valid request", func(t *testing.T) {
sender := testchain.PrivateKeyByID(0) // owner of the deposit in testchain sender := testchain.PrivateKeyByID(0) // owner of the deposit in testchain
mainTx := &transaction.Transaction{ p := createValidNotaryRequest(chain, sender, 1)
Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}},
Script: []byte{byte(opcode.RET)},
ValidUntilBlock: 123,
Signers: []transaction.Signer{{Account: util.Uint160{1, 5, 9}}},
Scripts: []transaction.Witness{{
InvocationScript: []byte{1, 4, 7},
VerificationScript: []byte{3, 6, 9},
}},
}
fallbackTx := &transaction.Transaction{
Script: []byte{byte(opcode.RET)},
ValidUntilBlock: 123,
Attributes: []transaction.Attribute{
{Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: 123}},
{Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}},
{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}},
},
Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}},
Scripts: []transaction.Witness{
{InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}},
},
NetworkFee: 2_0000_0000,
}
fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{
InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...),
VerificationScript: sender.PublicKey().GetVerificationScript(),
})
p := &payload.P2PNotaryRequest{
MainTransaction: mainTx,
FallbackTransaction: fallbackTx,
}
p.Witness = transaction.Witness{
InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...),
VerificationScript: sender.PublicKey().GetVerificationScript(),
}
bytes, err := p.Bytes() bytes, err := p.Bytes()
require.NoError(t, err) require.NoError(t, err)
str := fmt.Sprintf(`"%s"`, base64.StdEncoding.EncodeToString(bytes)) str := fmt.Sprintf(`"%s"`, base64.StdEncoding.EncodeToString(bytes))
@ -1150,6 +1115,50 @@ func TestSubmitNotaryRequest(t *testing.T) {
}) })
} }
// createValidNotaryRequest creates and signs P2PNotaryRequest payload which can
// pass verification.
func createValidNotaryRequest(chain *core.Blockchain, sender *keys.PrivateKey, nonce uint32) *payload.P2PNotaryRequest {
h := chain.BlockHeight()
mainTx := &transaction.Transaction{
Nonce: nonce,
Attributes: []transaction.Attribute{{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 1}}},
Script: []byte{byte(opcode.RET)},
ValidUntilBlock: h + 100,
Signers: []transaction.Signer{{Account: sender.GetScriptHash()}},
Scripts: []transaction.Witness{{
InvocationScript: []byte{1, 4, 7},
VerificationScript: []byte{3, 6, 9},
}},
}
fallbackTx := &transaction.Transaction{
Script: []byte{byte(opcode.RET)},
ValidUntilBlock: h + 100,
Attributes: []transaction.Attribute{
{Type: transaction.NotValidBeforeT, Value: &transaction.NotValidBefore{Height: h + 50}},
{Type: transaction.ConflictsT, Value: &transaction.Conflicts{Hash: mainTx.Hash()}},
{Type: transaction.NotaryAssistedT, Value: &transaction.NotaryAssisted{NKeys: 0}},
},
Signers: []transaction.Signer{{Account: chain.GetNotaryContractScriptHash()}, {Account: sender.GetScriptHash()}},
Scripts: []transaction.Witness{
{InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, make([]byte, 64)...), VerificationScript: []byte{}},
},
NetworkFee: 2_0000_0000,
}
fallbackTx.Scripts = append(fallbackTx.Scripts, transaction.Witness{
InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), fallbackTx)...),
VerificationScript: sender.PublicKey().GetVerificationScript(),
})
p := &payload.P2PNotaryRequest{
MainTransaction: mainTx,
FallbackTransaction: fallbackTx,
}
p.Witness = transaction.Witness{
InvocationScript: append([]byte{byte(opcode.PUSHDATA1), 64}, sender.SignHashable(uint32(testchain.Network()), p)...),
VerificationScript: sender.PublicKey().GetVerificationScript(),
}
return p
}
// testRPCProtocol runs a full set of tests using given callback to make actual // testRPCProtocol runs a full set of tests using given callback to make actual
// calls. Some tests change the chain state, thus we reinitialize the chain from // calls. Some tests change the chain state, thus we reinitialize the chain from
// scratch here. // scratch here.

View file

@ -80,6 +80,21 @@ func (f *feed) Matches(r *response.Notification) bool {
filt := f.filter.(request.ExecutionFilter) filt := f.filter.(request.ExecutionFilter)
applog := r.Payload[0].(*state.AppExecResult) applog := r.Payload[0].(*state.AppExecResult)
return applog.VMState.String() == filt.State return applog.VMState.String() == filt.State
case response.NotaryRequestEventID:
filt := f.filter.(request.TxFilter)
req := r.Payload[0].(*response.NotaryRequestEvent)
senderOk := filt.Sender == nil || req.NotaryRequest.FallbackTransaction.Signers[1].Account == *filt.Sender
signerOK := true
if filt.Signer != nil {
signerOK = false
for _, signer := range req.NotaryRequest.MainTransaction.Signers {
if signer.Account.Equals(*filt.Signer) {
signerOK = true
break
}
}
}
return senderOk && signerOK
} }
return false return false
} }

View file

@ -86,10 +86,12 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st
func TestSubscriptions(t *testing.T) { func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0) var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed"} var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t) chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
go rpcSrv.coreServer.Start(make(chan error))
defer rpcSrv.coreServer.Shutdown()
defer chain.Close() defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }() defer func() { _ = rpcSrv.Shutdown() }()
@ -133,6 +135,17 @@ func TestSubscriptions(t *testing.T) {
require.Equal(t, response.BlockEventID, resp.Event) require.Equal(t, response.BlockEventID, resp.Event)
} }
// We should manually add NotaryRequest to test notification.
sender := testchain.PrivateKeyByID(0)
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, sender, 1))
require.NoError(t, err)
for {
resp := getNotification(t, respMsgs)
if resp.Event == response.NotaryRequestEventID {
break
}
}
for _, id := range subIDs { for _, id := range subIDs {
callUnsubscribe(t, c, respMsgs, id) callUnsubscribe(t, c, respMsgs, id)
} }
@ -277,6 +290,100 @@ func TestFilteredSubscriptions(t *testing.T) {
} }
} }
func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
// We can't fit this into TestFilteredSubscriptions, because notary requests
// event doesn't depend on blocks events.
priv0 := testchain.PrivateKeyByID(0)
var goodSender = priv0.GetScriptHash()
var cases = map[string]struct {
params string
check func(*testing.T, *response.Notification)
}{
"matching sender": {
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]interface{})
fbTx := req["fallbacktx"].(map[string]interface{})
sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), sender)
},
},
"matching signer": {
params: `["notary_request_event", {"signer":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]interface{})
mainTx := req["maintx"].(map[string]interface{})
signers := mainTx["signers"].([]interface{})
signer0 := signers[0].(map[string]interface{})
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
"matching sender and signer": {
params: `["notary_request_event", {"sender":"` + goodSender.StringLE() + `", "signer":"` + goodSender.StringLE() + `"}]`,
check: func(t *testing.T, resp *response.Notification) {
rmap := resp.Payload[0].(map[string]interface{})
require.Equal(t, response.NotaryRequestEventID, resp.Event)
require.Equal(t, "added", rmap["type"].(string))
req := rmap["notaryrequest"].(map[string]interface{})
mainTx := req["maintx"].(map[string]interface{})
fbTx := req["fallbacktx"].(map[string]interface{})
sender := fbTx["signers"].([]interface{})[1].(map[string]interface{})["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), sender)
signers := mainTx["signers"].([]interface{})
signer0 := signers[0].(map[string]interface{})
signer0acc := signer0["account"].(string)
require.Equal(t, "0x"+goodSender.StringLE(), signer0acc)
},
},
}
chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
go rpcSrv.coreServer.Start(make(chan error, 1))
defer chain.Close()
defer func() { _ = rpcSrv.Shutdown() }()
// blocks are needed to make GAS deposit for priv0
blocks := getTestBlocks(t)
for _, b := range blocks {
require.NoError(t, chain.AddBlock(b))
}
var nonce uint32 = 100
for name, this := range cases {
t.Run(name, func(t *testing.T) {
subID := callSubscribe(t, c, respMsgs, this.params)
err := rpcSrv.coreServer.RelayP2PNotaryRequest(createValidNotaryRequest(chain, priv0, nonce))
require.NoError(t, err)
nonce++
var resp = new(response.Notification)
select {
case body := <-respMsgs:
require.NoError(t, json.Unmarshal(body, resp))
case <-time.After(time.Second):
t.Fatal("timeout waiting for event")
}
require.Equal(t, response.NotaryRequestEventID, resp.Event)
this.check(t, resp)
callUnsubscribe(t, c, respMsgs, subID)
})
}
finishedFlag.CAS(false, true)
c.Close()
}
func TestFilteredBlockSubscriptions(t *testing.T) { func TestFilteredBlockSubscriptions(t *testing.T) {
// We can't fit this into TestFilteredSubscriptions, because it uses // We can't fit this into TestFilteredSubscriptions, because it uses
// blocks as EOF events to wait for. // blocks as EOF events to wait for.

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -48,7 +49,7 @@ type (
mp *mempool.Pool mp *mempool.Pool
// requests channel // requests channel
reqCh chan mempool.Event reqCh chan mempoolevent.Event
blocksCh chan *block.Block blocksCh chan *block.Block
stopCh chan struct{} stopCh chan struct{}
} }
@ -109,7 +110,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
wallet: wallet, wallet: wallet,
onTransaction: onTransaction, onTransaction: onTransaction,
mp: mp, mp: mp,
reqCh: make(chan mempool.Event), reqCh: make(chan mempoolevent.Event),
blocksCh: make(chan *block.Block), blocksCh: make(chan *block.Block),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
}, nil }, nil
@ -129,9 +130,9 @@ func (n *Notary) Run() {
case event := <-n.reqCh: case event := <-n.reqCh:
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
switch event.Type { switch event.Type {
case mempool.TransactionAdded: case mempoolevent.TransactionAdded:
n.OnNewRequest(req) n.OnNewRequest(req)
case mempool.TransactionRemoved: case mempoolevent.TransactionRemoved:
n.OnRequestRemoval(req) n.OnRequestRemoval(req)
} }
} }

View file

@ -87,7 +87,7 @@ func (e *executor) runProg(t *testing.T, commands ...string) {
}() }()
select { select {
case <-e.ch: case <-e.ch:
case <-time.After(time.Second): case <-time.After(2 * time.Second):
require.Fail(t, "command took too long time") require.Fail(t, "command took too long time")
} }
} }