network: add notary request payload

This commit is contained in:
Anna Shaleva 2020-11-27 13:55:48 +03:00
parent 501c0c93c6
commit 0b5cf78468
24 changed files with 1396 additions and 232 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
@ -59,11 +60,13 @@ type (
// stateRootInHeader specifies if block header contain state root.
stateRootInHeader bool
transport Transporter
discovery Discoverer
chain blockchainer.Blockchainer
bQueue *blockQueue
consensus consensus.Service
transport Transporter
discovery Discoverer
chain blockchainer.Blockchainer
bQueue *blockQueue
consensus consensus.Service
notaryRequestPool *mempool.Pool
NotaryFeer NotaryFeer
lock sync.RWMutex
peers map[Peer]bool
@ -124,6 +127,15 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
log: log,
transactions: make(chan *transaction.Transaction, 64),
}
if chain.P2PSigExtensionsEnabled() {
s.NotaryFeer = NewNotaryFeer(chain)
s.notaryRequestPool = mempool.New(chain.GetConfig().P2PNotaryRequestPayloadPoolSize, 1)
chain.RegisterPostBlock(func(bc blockchainer.Blockchainer, txpool *mempool.Pool, _ *block.Block) {
s.notaryRequestPool.RemoveStale(func(t *transaction.Transaction) bool {
return bc.IsTxStillRelevant(t, txpool, true)
}, s.NotaryFeer)
})
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
if !s.consensusStarted.Load() {
s.tryStartConsensus()
@ -188,7 +200,7 @@ func (s *Server) Start(errChan chan error) {
zap.Uint32("headerHeight", s.chain.HeaderHeight()))
s.tryStartConsensus()
s.initStaleTxMemPool()
s.initStaleMemPools()
go s.broadcastTxLoop()
go s.relayBlocksLoop()
@ -507,6 +519,9 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
cp := s.consensus.GetPayload(h)
return cp != nil
},
payload.P2PNotaryRequestType: func(h util.Uint256) bool {
return s.notaryRequestPool.ContainsKey(h)
},
}
if exists := typExists[inv.Type]; exists != nil {
for _, hash := range inv.Hashes {
@ -573,6 +588,12 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
if cp := s.consensus.GetPayload(hash); cp != nil {
msg = NewMessage(CMDConsensus, cp)
}
case payload.P2PNotaryRequestType:
if nrp, ok := s.notaryRequestPool.TryGetData(hash); ok { // already have checked P2PSigExtEnabled
msg = NewMessage(CMDP2PNotaryRequest, nrp.(*payload.P2PNotaryRequest))
} else {
notFound = append(notFound, hash)
}
}
if msg != nil {
pkt, err := msg.Bytes()
@ -687,11 +708,62 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// in the pool.
if s.verifyAndPoolTX(tx) == RelaySucceed {
s.consensus.OnTransaction(tx)
s.broadcastTX(tx)
s.broadcastTX(tx, nil)
}
return nil
}
// handleP2PNotaryRequestCmd process received P2PNotaryRequest payload.
func (s *Server) handleP2PNotaryRequestCmd(r *payload.P2PNotaryRequest) error {
if !s.chain.P2PSigExtensionsEnabled() {
return errors.New("P2PNotaryRequestCMD was received, but P2PSignatureExtensions are disabled")
}
if s.verifyAndPoolNotaryRequest(r) == RelaySucceed {
s.broadcastP2PNotaryRequestPayload(nil, r)
}
return nil
}
// verifyAndPoolNotaryRequest verifies NotaryRequest payload and adds it to the payload mempool.
func (s *Server) verifyAndPoolNotaryRequest(r *payload.P2PNotaryRequest) RelayReason {
if err := s.chain.PoolTxWithData(r.FallbackTransaction, r, s.notaryRequestPool, s.NotaryFeer, verifyNotaryRequest); err != nil {
switch {
case errors.Is(err, core.ErrAlreadyExists):
return RelayAlreadyExists
case errors.Is(err, core.ErrOOM):
return RelayOutOfMemory
case errors.Is(err, core.ErrPolicy):
return RelayPolicyFail
default:
return RelayInvalid
}
}
return RelaySucceed
}
// verifyNotaryRequest is a function for state-dependant P2PNotaryRequest payload verification which is executed before ordinary blockchain's verification.
func verifyNotaryRequest(bc blockchainer.Blockchainer, _ *transaction.Transaction, data interface{}) error {
r := data.(*payload.P2PNotaryRequest)
payer := r.FallbackTransaction.Signers[1].Account
if err := bc.VerifyWitness(payer, r, &r.Witness, bc.GetPolicer().GetMaxVerificationGAS()); err != nil {
return fmt.Errorf("bad P2PNotaryRequest payload witness: %w", err)
}
if r.FallbackTransaction.Sender() != bc.GetNotaryContractScriptHash() {
return errors.New("P2PNotary contract should be a sender of the fallback transaction")
}
depositExpiration := bc.GetNotaryDepositExpiration(payer)
if r.FallbackTransaction.ValidUntilBlock >= depositExpiration {
return fmt.Errorf("fallback transaction is valid after deposit is unlocked: ValidUntilBlock is %d, deposit lock expires at %d", r.FallbackTransaction.ValidUntilBlock, depositExpiration)
}
return nil
}
func (s *Server) broadcastP2PNotaryRequestPayload(_ *transaction.Transaction, data interface{}) {
r := data.(payload.P2PNotaryRequest) // we can guarantee that cast is successful
msg := NewMessage(CMDInv, payload.NewInventory(payload.P2PNotaryRequestType, []util.Uint256{r.FallbackTransaction.Hash()}))
s.broadcastMessage(msg)
}
// handleAddrCmd will process received addresses.
func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error {
if !p.CanProcessAddr() {
@ -770,7 +842,7 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
if peer.Handshaked() {
if inv, ok := msg.Payload.(*payload.Inventory); ok {
if !inv.Type.Valid() || len(inv.Hashes) == 0 {
if !inv.Type.Valid(s.chain.P2PSigExtensionsEnabled()) || len(inv.Hashes) == 0 {
return errInvalidInvType
}
}
@ -808,6 +880,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDTX:
tx := msg.Payload.(*transaction.Transaction)
return s.handleTxCmd(tx)
case CMDP2PNotaryRequest:
r := msg.Payload.(*payload.P2PNotaryRequest)
return s.handleP2PNotaryRequestCmd(r)
case CMDPing:
ping := msg.Payload.(*payload.Ping)
return s.handlePing(peer, ping)
@ -943,13 +1018,13 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {
func (s *Server) RelayTxn(t *transaction.Transaction) RelayReason {
ret := s.verifyAndPoolTX(t)
if ret == RelaySucceed {
s.broadcastTX(t)
s.broadcastTX(t, nil)
}
return ret
}
// broadcastTX broadcasts an inventory message about new transaction.
func (s *Server) broadcastTX(t *transaction.Transaction) {
func (s *Server) broadcastTX(t *transaction.Transaction, _ interface{}) {
select {
case s.transactions <- t:
case <-s.quit:
@ -964,8 +1039,8 @@ func (s *Server) broadcastTxHashes(hs []util.Uint256) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, Peer.IsFullNode)
}
// initStaleTxMemPool initializes mempool for stale tx processing.
func (s *Server) initStaleTxMemPool() {
// initStaleMemPools initializes mempools for stale tx/payload processing.
func (s *Server) initStaleMemPools() {
cfg := s.chain.GetConfig()
threshold := 5
if cfg.ValidatorsCount*2 > threshold {
@ -974,6 +1049,9 @@ func (s *Server) initStaleTxMemPool() {
mp := s.chain.GetMemPool()
mp.SetResendThreshold(uint32(threshold), s.broadcastTX)
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.SetResendThreshold(uint32(threshold), s.broadcastP2PNotaryRequestPayload)
}
}
// broadcastTxLoop is a loop for batching and sending