diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 9df42c507..cfae16adb 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -30,4 +30,6 @@ type ApplicationConfiguration struct { Oracle OracleConfiguration `yaml:"Oracle"` P2PNotary P2PNotary `yaml:"P2PNotary"` StateRoot StateRoot `yaml:"StateRoot"` + // ExtensiblePoolSize is the maximum amount of the extensible payloads from a single sender. + ExtensiblePoolSize int `yaml:"ExtensiblePoolSize"` } diff --git a/pkg/network/extpool/pool.go b/pkg/network/extpool/pool.go index ef9bba218..82a1cf77f 100644 --- a/pkg/network/extpool/pool.go +++ b/pkg/network/extpool/pool.go @@ -1,6 +1,7 @@ package extpool import ( + "container/list" "errors" "sync" @@ -12,15 +13,24 @@ import ( // Pool represents pool of extensible payloads. type Pool struct { lock sync.RWMutex - verified map[util.Uint256]*payload.Extensible - chain blockchainer.Blockchainer + verified map[util.Uint256]*list.Element + senders map[util.Uint160]*list.List + // singleCap represents maximum number of payloads from the single sender. + singleCap int + chain blockchainer.Blockchainer } // New returns new payload pool using provided chain. -func New(bc blockchainer.Blockchainer) *Pool { +func New(bc blockchainer.Blockchainer, capacity int) *Pool { + if capacity <= 0 { + panic("invalid capacity") + } + return &Pool{ - verified: make(map[util.Uint256]*payload.Extensible), - chain: bc, + verified: make(map[util.Uint256]*list.Element), + senders: make(map[util.Uint160]*list.List), + singleCap: capacity, + chain: bc, } } @@ -44,7 +54,17 @@ func (p *Pool) Add(e *payload.Extensible) (bool, error) { if _, ok := p.verified[h]; ok { return false, nil } - p.verified[h] = e + + lst, ok := p.senders[e.Sender] + if ok && lst.Len() >= p.singleCap { + value := lst.Remove(lst.Front()) + delete(p.verified, value.(*payload.Extensible).Hash()) + } else if !ok { + lst = list.New() + p.senders[e.Sender] = lst + } + + p.verified[h] = lst.PushBack(e) return true, nil } @@ -72,7 +92,11 @@ func (p *Pool) Get(h util.Uint256) *payload.Extensible { p.lock.RLock() defer p.lock.RUnlock() - return p.verified[h] + elem, ok := p.verified[h] + if !ok { + return nil + } + return elem.Value.(*payload.Extensible) } const extensibleVerifyMaxGAS = 2000000 @@ -81,13 +105,27 @@ const extensibleVerifyMaxGAS = 2000000 func (p *Pool) RemoveStale(index uint32) { p.lock.Lock() defer p.lock.Unlock() - for h, e := range p.verified { - if e.ValidBlockEnd <= index || !p.chain.IsExtensibleAllowed(e.Sender) { - delete(p.verified, h) - continue + + for s, lst := range p.senders { + for elem := lst.Front(); elem != nil; { + e := elem.Value.(*payload.Extensible) + h := e.Hash() + old := elem + elem = elem.Next() + + if e.ValidBlockEnd <= index || !p.chain.IsExtensibleAllowed(e.Sender) { + delete(p.verified, h) + lst.Remove(old) + continue + } + if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil { + delete(p.verified, h) + lst.Remove(old) + continue + } } - if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil { - delete(p.verified, h) + if lst.Len() == 0 { + delete(p.senders, s) } } } diff --git a/pkg/network/extpool/pool_test.go b/pkg/network/extpool/pool_test.go index 873dc8d7b..0919fff49 100644 --- a/pkg/network/extpool/pool_test.go +++ b/pkg/network/extpool/pool_test.go @@ -16,7 +16,7 @@ func TestAddGet(t *testing.T) { bc := newTestChain() bc.height = 10 - p := New(bc) + p := New(bc, 100) t.Run("invalid witness", func(t *testing.T) { ep := &payload.Extensible{ValidBlockEnd: 100, Sender: util.Uint160{0x42}} p.testAdd(t, false, errVerification, ep) @@ -41,11 +41,52 @@ func TestAddGet(t *testing.T) { }) } +func TestCapacityLimit(t *testing.T) { + bc := newTestChain() + bc.height = 10 + + t.Run("invalid capacity", func(t *testing.T) { + require.Panics(t, func() { New(bc, 0) }) + }) + + p := New(bc, 3) + + first := &payload.Extensible{ValidBlockEnd: 11} + p.testAdd(t, true, nil, first) + + for _, height := range []uint32{12, 13} { + ep := &payload.Extensible{ValidBlockEnd: height} + p.testAdd(t, true, nil, ep) + } + + require.NotNil(t, p.Get(first.Hash())) + + ok, err := p.Add(&payload.Extensible{ValidBlockEnd: 14}) + require.True(t, ok) + require.NoError(t, err) + + require.Nil(t, p.Get(first.Hash())) +} + +// This test checks that sender count is updated +// when oldest payload is removed during `Add`. +func TestDecreaseSenderOnEvict(t *testing.T) { + bc := newTestChain() + bc.height = 10 + + p := New(bc, 2) + senders := []util.Uint160{{1}, {2}, {3}} + for i := uint32(11); i < 17; i++ { + ep := &payload.Extensible{Sender: senders[i%3], ValidBlockEnd: i} + p.testAdd(t, true, nil, ep) + } +} + func TestRemoveStale(t *testing.T) { bc := newTestChain() bc.height = 10 - p := New(bc) + p := New(bc, 100) eps := []*payload.Extensible{ {ValidBlockEnd: 11}, // small height {ValidBlockEnd: 12}, // good diff --git a/pkg/network/server.go b/pkg/network/server.go index b463147ae..e5f9f8e96 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -30,11 +30,12 @@ import ( const ( // peer numbers are arbitrary at the moment. - defaultMinPeers = 5 - defaultAttemptConnPeers = 20 - defaultMaxPeers = 100 - maxBlockBatch = 200 - minPoolCount = 30 + defaultMinPeers = 5 + defaultAttemptConnPeers = 20 + defaultMaxPeers = 100 + defaultExtensiblePoolSize = 20 + maxBlockBatch = 200 + minPoolCount = 30 ) var ( @@ -121,6 +122,12 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai return nil, errors.New("logger is a required parameter") } + if config.ExtensiblePoolSize <= 0 { + config.ExtensiblePoolSize = defaultExtensiblePoolSize + log.Info("ExtensiblePoolSize is not set or wrong, using default value", + zap.Int("ExtensiblePoolSize", config.ExtensiblePoolSize)) + } + s := &Server{ ServerConfig: config, chain: chain, @@ -132,7 +139,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai unregister: make(chan peerDrop), peers: make(map[Peer]bool), syncReached: atomic.NewBool(false), - extensiblePool: extpool.New(chain), + extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, transactions: make(chan *transaction.Transaction, 64), } diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index 374c38462..4c0f905a6 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -78,6 +78,9 @@ type ( // StateRootCfg is stateroot module configuration. StateRootCfg config.StateRoot + + // ExtensiblePoolSize is size of the pool for extensible payloads from a single sender. + ExtensiblePoolSize int } ) @@ -93,24 +96,25 @@ func NewServerConfig(cfg config.Config) ServerConfig { } return ServerConfig{ - UserAgent: cfg.GenerateUserAgent(), - Address: appConfig.Address, - AnnouncedPort: appConfig.AnnouncedNodePort, - Port: appConfig.NodePort, - Net: protoConfig.Magic, - Relay: appConfig.Relay, - Seeds: protoConfig.SeedList, - DialTimeout: appConfig.DialTimeout * time.Second, - ProtoTickInterval: appConfig.ProtoTickInterval * time.Second, - PingInterval: appConfig.PingInterval * time.Second, - PingTimeout: appConfig.PingTimeout * time.Second, - MaxPeers: appConfig.MaxPeers, - AttemptConnPeers: appConfig.AttemptConnPeers, - MinPeers: appConfig.MinPeers, - Wallet: wc, - TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, - OracleCfg: appConfig.Oracle, - P2PNotaryCfg: appConfig.P2PNotary, - StateRootCfg: appConfig.StateRoot, + UserAgent: cfg.GenerateUserAgent(), + Address: appConfig.Address, + AnnouncedPort: appConfig.AnnouncedNodePort, + Port: appConfig.NodePort, + Net: protoConfig.Magic, + Relay: appConfig.Relay, + Seeds: protoConfig.SeedList, + DialTimeout: appConfig.DialTimeout * time.Second, + ProtoTickInterval: appConfig.ProtoTickInterval * time.Second, + PingInterval: appConfig.PingInterval * time.Second, + PingTimeout: appConfig.PingTimeout * time.Second, + MaxPeers: appConfig.MaxPeers, + AttemptConnPeers: appConfig.AttemptConnPeers, + MinPeers: appConfig.MinPeers, + Wallet: wc, + TimePerBlock: time.Duration(protoConfig.SecondsPerBlock) * time.Second, + OracleCfg: appConfig.Oracle, + P2PNotaryCfg: appConfig.P2PNotary, + StateRootCfg: appConfig.StateRoot, + ExtensiblePoolSize: appConfig.ExtensiblePoolSize, } }