mempool: warn about failed subscription

This commit is contained in:
Anna Shaleva 2023-04-18 13:19:12 +03:00
parent 7306beca4d
commit 7096ddb586
7 changed files with 164 additions and 10 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/holiman/uint256" "github.com/holiman/uint256"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
@ -289,7 +290,13 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e
updateMempoolMetrics(len(mp.verifiedTxes)) updateMempoolMetrics(len(mp.verifiedTxes))
mp.lock.Unlock() mp.lock.Unlock()
if pItem.data != nil {
r := pItem.data.(*payload.P2PNotaryRequest)
fmt.Printf("Mempool: notary request added:\n\tmain hash: %s\n\tfb hash: %s\n\tVUB: %d\n", r.MainTransaction.Hash(), r.FallbackTransaction.Hash(), r.MainTransaction.ValidUntilBlock)
}
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
fmt.Println("Mempool: event sent to events")
mp.events <- mempoolevent.Event{ mp.events <- mempoolevent.Event{
Type: mempoolevent.TransactionAdded, Type: mempoolevent.TransactionAdded,
Tx: pItem.txn, Tx: pItem.txn,

View file

@ -1,6 +1,10 @@
package mempool package mempool
import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" import (
"errors"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
)
// RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled.
// You should manually free the resources by calling StopSubscriptions on mempool shutdown. // You should manually free the resources by calling StopSubscriptions on mempool shutdown.
@ -29,10 +33,12 @@ func (mp *Pool) StopSubscriptions() {
// there is a new transactions added to the mempool or an existing transaction removed from // there is a new transactions added to the mempool or an existing transaction removed from
// the mempool, you'll receive it via this channel. Make sure you're not changing the received // the mempool, you'll receive it via this channel. Make sure you're not changing the received
// mempool events, as it may affect the functionality of other subscribers. // mempool events, as it may affect the functionality of other subscribers.
func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) { func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) error {
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.subCh <- ch mp.subCh <- ch
return nil
} }
return errors.New("mempool subscriptions are disabled")
} }
// UnsubscribeFromTransactions unsubscribes the given channel from new mempool notifications, // UnsubscribeFromTransactions unsubscribes the given channel from new mempool notifications,

View file

@ -28,7 +28,7 @@ func TestSubscriptions(t *testing.T) {
mp.RunSubscriptions() mp.RunSubscriptions()
subChan1 := make(chan mempoolevent.Event, 3) subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan mempoolevent.Event, 3) subChan2 := make(chan mempoolevent.Event, 3)
mp.SubscribeForTransactions(subChan1) _ = mp.SubscribeForTransactions(subChan1)
t.Cleanup(mp.StopSubscriptions) t.Cleanup(mp.StopSubscriptions)
txs := make([]*transaction.Transaction, 4) txs := make([]*transaction.Transaction, 4)
@ -46,7 +46,7 @@ func TestSubscriptions(t *testing.T) {
require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event)
// severak subscribers // severak subscribers
mp.SubscribeForTransactions(subChan2) _ = mp.SubscribeForTransactions(subChan2)
require.NoError(t, mp.Add(txs[1], fs)) require.NoError(t, mp.Add(txs[1], fs))
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 := <-subChan1 event1 := <-subChan1

View file

@ -544,6 +544,7 @@ func (s *Server) tryStartServices() {
if s.IsInSync() && s.syncReached.CAS(false, true) { if s.IsInSync() && s.syncReached.CAS(false, true) {
s.log.Info("node reached synchronized state, starting services") s.log.Info("node reached synchronized state, starting services")
if s.chain.P2PSigExtensionsEnabled() { if s.chain.P2PSigExtensionsEnabled() {
s.log.Info("starting notary request pool subscriptions")
s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber. s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber.
} }
s.serviceLock.RLock() s.serviceLock.RLock()
@ -561,11 +562,11 @@ func (s *Server) tryStartServices() {
// other Server functions. Make sure you're not changing the received mempool // other Server functions. Make sure you're not changing the received mempool
// events, as it may affect the functionality of Blockchain and other subscribers. // events, as it may affect the functionality of Blockchain and other subscribers.
// Ensure that P2PSigExtensions are enabled before calling this method. // Ensure that P2PSigExtensions are enabled before calling this method.
func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) { func (s *Server) SubscribeForNotaryRequests(ch chan<- mempoolevent.Event) error {
if !s.chain.P2PSigExtensionsEnabled() { if !s.chain.P2PSigExtensionsEnabled() {
panic("P2PSigExtensions are disabled") panic("P2PSigExtensions are disabled")
} }
s.notaryRequestPool.SubscribeForTransactions(ch) return s.notaryRequestPool.SubscribeForTransactions(ch)
} }
// UnsubscribeFromNotaryRequests unsubscribes the given channel from notary request // UnsubscribeFromNotaryRequests unsubscribes the given channel from notary request

View file

@ -176,7 +176,12 @@ func (n *Notary) Start() {
} }
n.Config.Log.Info("starting notary service") n.Config.Log.Info("starting notary service")
n.Config.Chain.SubscribeForBlocks(n.blocksCh) n.Config.Chain.SubscribeForBlocks(n.blocksCh)
n.mp.SubscribeForTransactions(n.reqCh) // TODO: we need to be strictly sure that mempool subscriptions are properly enabled by
// the moment Notary service is starting
err := n.mp.SubscribeForTransactions(n.reqCh)
if err != nil {
n.Config.Log.Fatal("failed to subscribe for notary requests", zap.Error(err))
}
go n.newTxCallbackLoop() go n.newTxCallbackLoop()
go n.mainLoop() go n.mainLoop()
} }

View file

@ -3,6 +3,7 @@ package rpcsrv
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/elliptic"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
@ -52,6 +53,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest" "github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger" "github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm"
"github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
@ -61,6 +63,130 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func getCosigners(t *testing.T, multisig *wallet.Account, acc *wallet.Account, rubles util.Uint160) []actor.SignerAccount {
ss := make([]actor.SignerAccount, 0, 3)
m, pubsB, ok := vm.ParseMultiSigContract(multisig.Contract.Script)
require.True(t, ok)
pubs := make(keys.PublicKeys, len(pubsB))
var err error
for i := range pubsB {
pubs[i], err = keys.NewPublicKeyFromBytes(pubsB[i], elliptic.P256())
require.NoError(t, err)
}
fakeMultisig, err := notary.FakeMultisigAccount(m, pubs)
require.NoError(t, err)
ss = append(ss, []actor.SignerAccount{
{
// proxy contract
Signer: transaction.Signer{
Account: rubles,
Scopes: transaction.None,
},
Account: notary.FakeContractAccount(rubles),
},
{
// multi signature (committee, in fact)
Signer: transaction.Signer{
Account: multisig.ScriptHash(),
Scopes: transaction.Global,
},
Account: fakeMultisig, // encrypted
},
}...)
// invoker signature
ss = append(ss, actor.SignerAccount{
Signer: transaction.Signer{
Account: hash.Hash160(acc.GetVerificationScript()),
Scopes: transaction.Global,
},
Account: acc,
})
return ss
}
func TestNotarizator(t *testing.T) {
c, err := rpcclient.NewWS(context.Background(), "ws://localhost:20331/ws", rpcclient.Options{})
require.NoError(t, err)
require.NoError(t, c.Init())
w, err := wallet.NewWalletFromFile("../../../.docker/wallets/wallet1.json")
require.NoError(t, err)
acc := w.Accounts[0]
require.NoError(t, acc.Decrypt("one", w.Scrypt))
rubles, err := util.Uint160DecodeStringLE(testContractHash)
require.NoError(t, err)
multisig := w.Accounts[1]
cosigners := getCosigners(t, multisig, acc, rubles)
nAct, err := notary.NewActor(c, cosigners, acc)
require.NoError(t, err)
fmt.Println("actor created, starting subscriptions")
go subsLoop(t, rubles)
time.Sleep(10 * time.Second)
fmt.Println("starting notarizator")
mainH, fbHash, VUB, err := nAct.Notarize(nAct.MakeTunedCall(rubles, "putValue", nil, func(r *result.Invoke, t *transaction.Transaction) error {
if r.State != vmstate.Halt.String() {
return fmt.Errorf("FAULT putValue call: %s", r.FaultException)
}
t.ValidUntilBlock = 64
t.Nonce = 4
return nil
}, "notarykey", "notaryvalue"))
fmt.Printf("Notary request sent:\n\tmain hash: %s\n\tfb hash: %s\n\tVUB: %d\n\terr: %v\n\n", mainH.StringLE(), fbHash.StringLE(), VUB, err)
time.Sleep(20 * time.Second)
}
func subsLoop(t *testing.T, rubles util.Uint160) {
c, err := rpcclient.NewWS(context.Background(), "ws://localhost:20331/ws", rpcclient.Options{})
require.NoError(t, err)
require.NoError(t, c.Init())
ch := c.Notifications
exitCh := make(chan struct{})
go func() {
subsLoop:
for {
select {
case r, ok := <-ch:
if !ok {
fmt.Println("Subscriptions listener: !ok")
break subsLoop
}
switch r.Type {
case neorpc.NotaryRequestEventID:
val := r.Value.(*result.NotaryRequestEvent)
fmt.Printf("Notary request event:\n\ttype: %s\n\tmain hash: %s\n\tfb hash: %s\n\n", val.Type, val.NotaryRequest.MainTransaction.Hash().StringLE(), val.NotaryRequest.FallbackTransaction.Hash().StringLE())
default:
fmt.Printf("Non-notary event: %s\n", r.Type)
}
}
}
for range ch {
}
exitCh <- struct{}{}
}()
id, err := c.SubscribeForNotaryRequests(nil, &rubles)
require.NoError(t, err)
fmt.Println("subscribed for notary requests")
<-exitCh
close(ch)
close(exitCh)
require.NoError(t, c.Unsubscribe(id))
fmt.Println("unsubscribed from notary requests")
}
func TestClient_NEP17(t *testing.T) { func TestClient_NEP17(t *testing.T) {
chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t) chain, rpcSrv, httpSrv := initServerWithInMemoryChain(t)
defer chain.Close() defer chain.Close()

View file

@ -2572,7 +2572,10 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
return nil, neorpc.NewInternalServerError("server is shutting down") return nil, neorpc.NewInternalServerError("server is shutting down")
default: default:
} }
s.subscribeToChannel(event) err = s.subscribeToChannel(event)
if err != nil {
return nil, neorpc.NewInternalServerError(fmt.Errorf("failed to subscribe: %w", err).Error())
}
s.subsCounterLock.Unlock() s.subsCounterLock.Unlock()
return strconv.FormatInt(int64(id), 10), nil return strconv.FormatInt(int64(id), 10), nil
} }
@ -2580,7 +2583,7 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (interface{
// subscribeToChannel subscribes RPC server to appropriate chain events if // subscribeToChannel subscribes RPC server to appropriate chain events if
// it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock // it's not yet subscribed for them. It's supposed to be called with s.subsCounterLock
// taken by the caller. // taken by the caller.
func (s *Server) subscribeToChannel(event neorpc.EventID) { func (s *Server) subscribeToChannel(event neorpc.EventID) error {
switch event { switch event {
case neorpc.BlockEventID: case neorpc.BlockEventID:
if s.blockSubs == 0 { if s.blockSubs == 0 {
@ -2604,10 +2607,16 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) {
s.executionSubs++ s.executionSubs++
case neorpc.NotaryRequestEventID: case neorpc.NotaryRequestEventID:
if s.notaryRequestSubs == 0 { if s.notaryRequestSubs == 0 {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh) s.log.Info("rpc: subscribing for notary request pool")
err := s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
if err != nil {
s.log.Warn("failed to subscribe for notary requests", zap.Error(err))
break
}
} }
s.notaryRequestSubs++ s.notaryRequestSubs++
} }
return nil
} }
// unsubscribe handles unsubscription requests from websocket clients. // unsubscribe handles unsubscription requests from websocket clients.