diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index d393f6e34..d01962ef0 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -10,6 +10,7 @@ import ( "github.com/holiman/uint256" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/atomic" ) @@ -289,7 +290,13 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e updateMempoolMetrics(len(mp.verifiedTxes)) mp.lock.Unlock() + if pItem.data != nil { + r := pItem.data.(*payload.P2PNotaryRequest) + fmt.Printf("Mempool: notary request added:\n\tmain hash: %s\n\tfb hash: %s\n\tVUB: %d\n", r.MainTransaction.Hash(), r.FallbackTransaction.Hash(), r.MainTransaction.ValidUntilBlock) + } + if mp.subscriptionsOn.Load() { + fmt.Println("Mempool: event sent to events") mp.events <- mempoolevent.Event{ Type: mempoolevent.TransactionAdded, Tx: pItem.txn, diff --git a/pkg/core/mempool/subscriptions.go b/pkg/core/mempool/subscriptions.go index 0188c1994..174c35f84 100644 --- a/pkg/core/mempool/subscriptions.go +++ b/pkg/core/mempool/subscriptions.go @@ -1,6 +1,10 @@ package mempool -import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" +import ( + "errors" + + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" +) // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // You should manually free the resources by calling StopSubscriptions on mempool shutdown. @@ -29,10 +33,12 @@ func (mp *Pool) StopSubscriptions() { // there is a new transactions added to the mempool or an existing transaction removed from // the mempool, you'll receive it via this channel. Make sure you're not changing the received // mempool events, as it may affect the functionality of other subscribers. -func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) { +func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) error { if mp.subscriptionsOn.Load() { mp.subCh <- ch + return nil } + return errors.New("mempool subscriptions are disabled") } // UnsubscribeFromTransactions unsubscribes the given channel from new mempool notifications, diff --git a/pkg/core/mempool/subscriptions_test.go b/pkg/core/mempool/subscriptions_test.go index bbe1bc2a3..5ff402047 100644 --- a/pkg/core/mempool/subscriptions_test.go +++ b/pkg/core/mempool/subscriptions_test.go @@ -28,7 +28,7 @@ func TestSubscriptions(t *testing.T) { mp.RunSubscriptions() subChan1 := make(chan mempoolevent.Event, 3) subChan2 := make(chan mempoolevent.Event, 3) - mp.SubscribeForTransactions(subChan1) + _ = mp.SubscribeForTransactions(subChan1) t.Cleanup(mp.StopSubscriptions) txs := make([]*transaction.Transaction, 4) @@ -46,7 +46,7 @@ func TestSubscriptions(t *testing.T) { require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event) // severak subscribers - mp.SubscribeForTransactions(subChan2) + _ = mp.SubscribeForTransactions(subChan2) require.NoError(t, mp.Add(txs[1], fs)) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) event1 := <-subChan1 diff --git a/pkg/network/server.go b/pkg/network/server.go index ca37d774f..405cbab5c 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -544,6 +544,7 @@ func (s *Server) tryStartServices() { if s.IsInSync() && s.syncReached.CAS(false, true) { s.log.Info("node reached synchronized state, starting services") if s.chain.P2PSigExtensionsEnabled() { + s.log.Info("starting notary request pool subscriptions") s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. } s.serviceLock.RLock() @@ -561,11 +562,11 @@ func (s *Server) tryStartServices() { // other Server functions. Make sure you're not changing the received mempool // events, as it may affect the functionality of Blockchain and other subscribers. // Ensure that P2PSigExtensions are enabled before calling this method. -func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) { +func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) error { if !s.chain.P2PSigExtensionsEnabled() { panic("P2PSigExtensions are disabled") } - s.notaryRequestPool.SubscribeForTransactions(ch) + return s.notaryRequestPool.SubscribeForTransactions(ch) } // UnsubscribeFromNotaryRequests unsubscribes the given channel from notary request diff --git a/pkg/services/notary/notary.go b/pkg/services/notary/notary.go index e3b6bea96..2cebca5e0 100644 --- a/pkg/services/notary/notary.go +++ b/pkg/services/notary/notary.go @@ -176,7 +176,12 @@ func (n *Notary) Start() { } n.Config.Log.Info("starting notary service") n.Config.Chain.SubscribeForBlocks(n.blocksCh) - n.mp.SubscribeForTransactions(n.reqCh) + // TODO: we need to be strictly sure that mempool subscriptions are properly enabled by + // the moment Notary service is starting + err := n.mp.SubscribeForTransactions(n.reqCh) + if err != nil { + n.Config.Log.Fatal("failed to subscribe for notary requests", zap.Error(err)) + } go n.newTxCallbackLoop() go n.mainLoop() } diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index 031dab5ee..5516ff3b1 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -3,6 +3,7 @@ package rpcsrv import ( "bytes" "context" + "crypto/elliptic" "encoding/base64" "encoding/hex" "encoding/json" @@ -52,6 +53,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/vm" "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" @@ -61,6 +63,130 @@ import ( "github.com/stretchr/testify/require" ) +func getCosigners(t *testing.T, multisig *wallet.Account, acc *wallet.Account, rubles util.Uint160) []actor.SignerAccount { + ss := make([]actor.SignerAccount, 0, 3) + + m, pubsB, ok := vm.ParseMultiSigContract(multisig.Contract.Script) + require.True(t, ok) + pubs := make(keys.PublicKeys, len(pubsB)) + var err error + for i := range pubsB { + pubs[i], err = keys.NewPublicKeyFromBytes(pubsB[i], elliptic.P256()) + require.NoError(t, err) + } + fakeMultisig, err := notary.FakeMultisigAccount(m, pubs) + require.NoError(t, err) + + ss = append(ss, []actor.SignerAccount{ + { + // proxy contract + Signer: transaction.Signer{ + Account: rubles, + Scopes: transaction.None, + }, + Account: notary.FakeContractAccount(rubles), + }, + { + // multi signature (committee, in fact) + Signer: transaction.Signer{ + Account: multisig.ScriptHash(), + Scopes: transaction.Global, + }, + Account: fakeMultisig, // encrypted + }, + }...) + + // invoker signature + ss = append(ss, actor.SignerAccount{ + Signer: transaction.Signer{ + Account: hash.Hash160(acc.GetVerificationScript()), + Scopes: transaction.Global, + }, + Account: acc, + }) + return ss +} + +func TestNotarizator(t *testing.T) { + c, err := rpcclient.NewWS(context.Background(), "ws://localhost:20331/ws", rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + + w, err := wallet.NewWalletFromFile("../../../.docker/wallets/wallet1.json") + require.NoError(t, err) + acc := w.Accounts[0] + require.NoError(t, acc.Decrypt("one", w.Scrypt)) + + rubles, err := util.Uint160DecodeStringLE(testContractHash) + require.NoError(t, err) + + multisig := w.Accounts[1] + cosigners := getCosigners(t, multisig, acc, rubles) + + nAct, err := notary.NewActor(c, cosigners, acc) + require.NoError(t, err) + + fmt.Println("actor created, starting subscriptions") + go subsLoop(t, rubles) + + time.Sleep(10 * time.Second) + + fmt.Println("starting notarizator") + mainH, fbHash, VUB, err := nAct.Notarize(nAct.MakeTunedCall(rubles, "putValue", nil, func(r *result.Invoke, t *transaction.Transaction) error { + if r.State != vmstate.Halt.String() { + return fmt.Errorf("FAULT putValue call: %s", r.FaultException) + } + + t.ValidUntilBlock = 64 + t.Nonce = 4 + + return nil + }, "notarykey", "notaryvalue")) + + fmt.Printf("Notary request sent:\n\tmain hash: %s\n\tfb hash: %s\n\tVUB: %d\n\terr: %v\n\n", mainH.StringLE(), fbHash.StringLE(), VUB, err) + + time.Sleep(20 * time.Second) +} + +func subsLoop(t *testing.T, rubles util.Uint160) { + c, err := rpcclient.NewWS(context.Background(), "ws://localhost:20331/ws", rpcclient.Options{}) + require.NoError(t, err) + require.NoError(t, c.Init()) + ch := c.Notifications + exitCh := make(chan struct{}) + go func() { + subsLoop: + for { + select { + case r, ok := <-ch: + if !ok { + fmt.Println("Subscriptions listener: !ok") + break subsLoop + } + switch r.Type { + case neorpc.NotaryRequestEventID: + val := r.Value.(*result.NotaryRequestEvent) + fmt.Printf("Notary request event:\n\ttype: %s\n\tmain hash: %s\n\tfb hash: %s\n\n", val.Type, val.NotaryRequest.MainTransaction.Hash().StringLE(), val.NotaryRequest.FallbackTransaction.Hash().StringLE()) + default: + fmt.Printf("Non-notary event: %s\n", r.Type) + } + } + } + + for range ch { + } + exitCh <- struct{}{} + }() + id, err := c.SubscribeForNotaryRequests(nil, &rubles) + require.NoError(t, err) + fmt.Println("subscribed for notary requests") + <-exitCh + close(ch) + close(exitCh) + require.NoError(t, c.Unsubscribe(id)) + fmt.Println("unsubscribed from notary requests") +} + func TestClient_NEP17(t *testing.T) { chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) defer chain.Close() diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index 6531ccaf8..e7fdcd384 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -2572,7 +2572,10 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ return nil, neorpc.NewInternalServerError("server is shutting down") default: } - s.subscribeToChannel(event) + err = s.subscribeToChannel(event) + if err != nil { + return nil, neorpc.NewInternalServerError(fmt.Errorf("failed to subscribe: %w", err).Error()) + } s.subsCounterLock.Unlock() return strconv.FormatInt(int64(id), 10), nil } @@ -2580,7 +2583,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{ // subscribeToChannel subscribes RPC server to appropriate chain events if // it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock // taken by the caller. -func (s *Server) subscribeToChannel(event neorpc.EventID) { +func (s *Server) subscribeToChannel(event neorpc.EventID) error { switch event { case neorpc.BlockEventID: if s.blockSubs == 0 { @@ -2604,10 +2607,16 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) { s.executionSubs++ case neorpc.NotaryRequestEventID: if s.notaryRequestSubs == 0 { - s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) + s.log.Info("rpc: subscribing for notary request pool") + err := s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) + if err != nil { + s.log.Warn("failed to subscribe for notary requests", zap.Error(err)) + break + } } s.notaryRequestSubs++ } + return nil } // unsubscribe handles unsubscription requests from websocket clients.