network: limit message number from the same sender

This commit is contained in:
Evgeniy Stratonikov 2021-05-04 17:54:16 +03:00
parent 35cdf0447c
commit 275a5c9daa
5 changed files with 132 additions and 40 deletions

View file

@ -30,4 +30,6 @@ type ApplicationConfiguration struct {
Oracle OracleConfiguration `yaml:"Oracle"`
P2PNotary P2PNotary `yaml:"P2PNotary"`
StateRoot StateRoot `yaml:"StateRoot"`
// ExtensiblePoolSize is the maximum amount of the extensible payloads from a single sender.
ExtensiblePoolSize int `yaml:"ExtensiblePoolSize"`
}

View file

@ -1,6 +1,7 @@
package extpool
import (
"container/list"
"errors"
"sync"
@ -12,15 +13,24 @@ import (
// Pool represents pool of extensible payloads.
type Pool struct {
lock sync.RWMutex
verified map[util.Uint256]*payload.Extensible
chain blockchainer.Blockchainer
verified map[util.Uint256]*list.Element
senders map[util.Uint160]*list.List
// singleCap represents maximum number of payloads from the single sender.
singleCap int
chain blockchainer.Blockchainer
}
// New returns new payload pool using provided chain.
func New(bc blockchainer.Blockchainer) *Pool {
func New(bc blockchainer.Blockchainer, capacity int) *Pool {
if capacity <= 0 {
panic("invalid capacity")
}
return &Pool{
verified: make(map[util.Uint256]*payload.Extensible),
chain: bc,
verified: make(map[util.Uint256]*list.Element),
senders: make(map[util.Uint160]*list.List),
singleCap: capacity,
chain: bc,
}
}
@ -44,7 +54,17 @@ func (p *Pool) Add(e *payload.Extensible) (bool, error) {
if _, ok := p.verified[h]; ok {
return false, nil
}
p.verified[h] = e
lst, ok := p.senders[e.Sender]
if ok && lst.Len() >= p.singleCap {
value := lst.Remove(lst.Front())
delete(p.verified, value.(*payload.Extensible).Hash())
} else if !ok {
lst = list.New()
p.senders[e.Sender] = lst
}
p.verified[h] = lst.PushBack(e)
return true, nil
}
@ -72,7 +92,11 @@ func (p *Pool) Get(h util.Uint256) *payload.Extensible {
p.lock.RLock()
defer p.lock.RUnlock()
return p.verified[h]
elem, ok := p.verified[h]
if !ok {
return nil
}
return elem.Value.(*payload.Extensible)
}
const extensibleVerifyMaxGAS = 2000000
@ -81,13 +105,27 @@ const extensibleVerifyMaxGAS = 2000000
func (p *Pool) RemoveStale(index uint32) {
p.lock.Lock()
defer p.lock.Unlock()
for h, e := range p.verified {
if e.ValidBlockEnd <= index || !p.chain.IsExtensibleAllowed(e.Sender) {
delete(p.verified, h)
continue
for s, lst := range p.senders {
for elem := lst.Front(); elem != nil; {
e := elem.Value.(*payload.Extensible)
h := e.Hash()
old := elem
elem = elem.Next()
if e.ValidBlockEnd <= index || !p.chain.IsExtensibleAllowed(e.Sender) {
delete(p.verified, h)
lst.Remove(old)
continue
}
if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil {
delete(p.verified, h)
lst.Remove(old)
continue
}
}
if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil {
delete(p.verified, h)
if lst.Len() == 0 {
delete(p.senders, s)
}
}
}

View file

@ -16,7 +16,7 @@ func TestAddGet(t *testing.T) {
bc := newTestChain()
bc.height = 10
p := New(bc)
p := New(bc, 100)
t.Run("invalid witness", func(t *testing.T) {
ep := &payload.Extensible{ValidBlockEnd: 100, Sender: util.Uint160{0x42}}
p.testAdd(t, false, errVerification, ep)
@ -41,11 +41,52 @@ func TestAddGet(t *testing.T) {
})
}
func TestCapacityLimit(t *testing.T) {
bc := newTestChain()
bc.height = 10
t.Run("invalid capacity", func(t *testing.T) {
require.Panics(t, func() { New(bc, 0) })
})
p := New(bc, 3)
first := &payload.Extensible{ValidBlockEnd: 11}
p.testAdd(t, true, nil, first)
for _, height := range []uint32{12, 13} {
ep := &payload.Extensible{ValidBlockEnd: height}
p.testAdd(t, true, nil, ep)
}
require.NotNil(t, p.Get(first.Hash()))
ok, err := p.Add(&payload.Extensible{ValidBlockEnd: 14})
require.True(t, ok)
require.NoError(t, err)
require.Nil(t, p.Get(first.Hash()))
}
// This test checks that sender count is updated
// when oldest payload is removed during `Add`.
func TestDecreaseSenderOnEvict(t *testing.T) {
bc := newTestChain()
bc.height = 10
p := New(bc, 2)
senders := []util.Uint160{{1}, {2}, {3}}
for i := uint32(11); i < 17; i++ {
ep := &payload.Extensible{Sender: senders[i%3], ValidBlockEnd: i}
p.testAdd(t, true, nil, ep)
}
}
func TestRemoveStale(t *testing.T) {
bc := newTestChain()
bc.height = 10
p := New(bc)
p := New(bc, 100)
eps := []*payload.Extensible{
{ValidBlockEnd: 11}, // small height
{ValidBlockEnd: 12}, // good

View file

@ -30,11 +30,12 @@ import (
const (
// peer numbers are arbitrary at the moment.
defaultMinPeers = 5
defaultAttemptConnPeers = 20
defaultMaxPeers = 100
maxBlockBatch = 200
minPoolCount = 30
defaultMinPeers = 5
defaultAttemptConnPeers = 20
defaultMaxPeers = 100
defaultExtensiblePoolSize = 20
maxBlockBatch = 200
minPoolCount = 30
)
var (
@ -121,6 +122,12 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
return nil, errors.New("logger is a required parameter")
}
if config.ExtensiblePoolSize <= 0 {
config.ExtensiblePoolSize = defaultExtensiblePoolSize
log.Info("ExtensiblePoolSize is not set or wrong, using default value",
zap.Int("ExtensiblePoolSize", config.ExtensiblePoolSize))
}
s := &Server{
ServerConfig: config,
chain: chain,
@ -132,7 +139,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
unregister: make(chan peerDrop),
peers: make(map[Peer]bool),
syncReached: atomic.NewBool(false),
extensiblePool: extpool.New(chain),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log,
transactions: make(chan *transaction.Transaction, 64),
}

View file

@ -78,6 +78,9 @@ type (
// StateRootCfg is stateroot module configuration.
StateRootCfg config.StateRoot
// ExtensiblePoolSize is size of the pool for extensible payloads from a single sender.
ExtensiblePoolSize int
}
)
@ -93,24 +96,25 @@ func NewServerConfig(cfg config.Config) ServerConfig {
}
return ServerConfig{
UserAgent: cfg.GenerateUserAgent(),
Address: appConfig.Address,
AnnouncedPort: appConfig.AnnouncedNodePort,
Port: appConfig.NodePort,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.DialTimeout * time.Second,
ProtoTickInterval: appConfig.ProtoTickInterval * time.Second,
PingInterval: appConfig.PingInterval * time.Second,
PingTimeout: appConfig.PingTimeout * time.Second,
MaxPeers: appConfig.MaxPeers,
AttemptConnPeers: appConfig.AttemptConnPeers,
MinPeers: appConfig.MinPeers,
Wallet: wc,
TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
UserAgent: cfg.GenerateUserAgent(),
Address: appConfig.Address,
AnnouncedPort: appConfig.AnnouncedNodePort,
Port: appConfig.NodePort,
Net: protoConfig.Magic,
Relay: appConfig.Relay,
Seeds: protoConfig.SeedList,
DialTimeout: appConfig.DialTimeout * time.Second,
ProtoTickInterval: appConfig.ProtoTickInterval * time.Second,
PingInterval: appConfig.PingInterval * time.Second,
PingTimeout: appConfig.PingTimeout * time.Second,
MaxPeers: appConfig.MaxPeers,
AttemptConnPeers: appConfig.AttemptConnPeers,
MinPeers: appConfig.MinPeers,
Wallet: wc,
TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second,
OracleCfg: appConfig.Oracle,
P2PNotaryCfg: appConfig.P2PNotary,
StateRootCfg: appConfig.StateRoot,
ExtensiblePoolSize: appConfig.ExtensiblePoolSize,
}
}