core: move mempool.Event to a separate package

And write a marshaller for EventType, it'll be reused by the Notification
subsystem.
This commit is contained in:
Anna Shaleva 2021-05-28 14:47:33 +03:00
parent ed6652bf1f
commit b8e96ac82b
5 changed files with 109 additions and 55 deletions

View file

@ -8,6 +8,7 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -75,9 +76,9 @@ type Pool struct {
subscriptionsEnabled bool subscriptionsEnabled bool
subscriptionsOn atomic.Bool subscriptionsOn atomic.Bool
stopCh chan struct{} stopCh chan struct{}
events chan Event events chan mempoolevent.Event
subCh chan chan<- Event // there are no other events in mempool except Event, so no need in generic subscribers type subCh chan chan<- mempoolevent.Event // there are no other events in mempool except Event, so no need in generic subscribers type
unsubCh chan chan<- Event unsubCh chan chan<- mempoolevent.Event
} }
func (p items) Len() int { return len(p) } func (p items) Len() int { return len(p) }
@ -259,8 +260,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e
} }
mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: unlucky.txn, Tx: unlucky.txn,
Data: unlucky.data, Data: unlucky.data,
} }
@ -287,8 +288,8 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer, data ...interface{}) e
mp.lock.Unlock() mp.lock.Unlock()
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionAdded, Type: mempoolevent.TransactionAdded,
Tx: pItem.txn, Tx: pItem.txn,
Data: pItem.data, Data: pItem.data,
} }
@ -332,8 +333,8 @@ func (mp *Pool) removeInternal(hash util.Uint256, feer Feer) {
delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID)
} }
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: itm.txn, Tx: itm.txn,
Data: itm.data, Data: itm.data,
} }
@ -382,8 +383,8 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool, feer Feer)
delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID) delete(mp.oracleResp, attrs[0].Value.(*transaction.OracleResponse).ID)
} }
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.events <- Event{ mp.events <- mempoolevent.Event{
Type: TransactionRemoved, Type: mempoolevent.TransactionRemoved,
Tx: itm.txn, Tx: itm.txn,
Data: itm.data, Data: itm.data,
} }
@ -428,9 +429,9 @@ func New(capacity int, payerIndex int, enableSubscriptions bool) *Pool {
oracleResp: make(map[uint64]util.Uint256), oracleResp: make(map[uint64]util.Uint256),
subscriptionsEnabled: enableSubscriptions, subscriptionsEnabled: enableSubscriptions,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
events: make(chan Event), events: make(chan mempoolevent.Event),
subCh: make(chan chan<- Event), subCh: make(chan chan<- mempoolevent.Event),
unsubCh: make(chan chan<- Event), unsubCh: make(chan chan<- mempoolevent.Event),
} }
mp.subscriptionsOn.Store(false) mp.subscriptionsOn.Store(false)
return mp return mp

View file

@ -1,25 +1,6 @@
package mempool package mempool
import ( import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
)
// EventType represents mempool event type.
type EventType byte
const (
// TransactionAdded marks transaction addition mempool event.
TransactionAdded EventType = 0x01
// TransactionRemoved marks transaction removal mempool event.
TransactionRemoved EventType = 0x02
)
// Event represents one of mempool events: transaction was added or removed from mempool.
type Event struct {
Type EventType
Tx *transaction.Transaction
Data interface{}
}
// RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled. // RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled.
// You should manually free the resources by calling StopSubscriptions on mempool shutdown. // You should manually free the resources by calling StopSubscriptions on mempool shutdown.
@ -47,7 +28,7 @@ func (mp *Pool) StopSubscriptions() {
// SubscribeForTransactions adds given channel to new mempool event broadcasting, so when // SubscribeForTransactions adds given channel to new mempool event broadcasting, so when
// there is a new transactions added to mempool or an existing transaction removed from // there is a new transactions added to mempool or an existing transaction removed from
// mempool you'll receive it via this channel. // mempool you'll receive it via this channel.
func (mp *Pool) SubscribeForTransactions(ch chan<- Event) { func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.subCh <- ch mp.subCh <- ch
} }
@ -55,7 +36,7 @@ func (mp *Pool) SubscribeForTransactions(ch chan<- Event) {
// UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications, // UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op. // you can close it afterwards. Passing non-subscribed channel is a no-op.
func (mp *Pool) UnsubscribeFromTransactions(ch chan<- Event) { func (mp *Pool) UnsubscribeFromTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() { if mp.subscriptionsOn.Load() {
mp.unsubCh <- ch mp.unsubCh <- ch
} }
@ -67,7 +48,7 @@ func (mp *Pool) notificationDispatcher() {
// These are just sets of subscribers, though modelled as maps // These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really // for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements). // expected, but maps are convenient for adding/deleting elements).
txFeed = make(map[chan<- Event]bool) txFeed = make(map[chan<- mempoolevent.Event]bool)
) )
for { for {
select { select {

View file

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/opcode"
@ -25,8 +26,8 @@ func TestSubscriptions(t *testing.T) {
fs := &FeerStub{balance: 100} fs := &FeerStub{balance: 100}
mp := New(2, 0, true) mp := New(2, 0, true)
mp.RunSubscriptions() mp.RunSubscriptions()
subChan1 := make(chan Event, 3) subChan1 := make(chan mempoolevent.Event, 3)
subChan2 := make(chan Event, 3) subChan2 := make(chan mempoolevent.Event, 3)
mp.SubscribeForTransactions(subChan1) mp.SubscribeForTransactions(subChan1)
t.Cleanup(mp.StopSubscriptions) t.Cleanup(mp.StopSubscriptions)
@ -42,7 +43,7 @@ func TestSubscriptions(t *testing.T) {
require.NoError(t, mp.Add(txs[0], fs)) require.NoError(t, mp.Add(txs[0], fs))
require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 }, time.Second, time.Millisecond*100)
event := <-subChan1 event := <-subChan1
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[0]}, event) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[0]}, event)
// severak subscribers // severak subscribers
mp.SubscribeForTransactions(subChan2) mp.SubscribeForTransactions(subChan2)
@ -50,28 +51,28 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 := <-subChan1 event1 := <-subChan1
event2 := <-subChan2 event2 := <-subChan2
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event1)
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[1]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[1]}, event2)
// reach capacity // reach capacity
require.NoError(t, mp.Add(txs[2], &FeerStub{})) require.NoError(t, mp.Add(txs[2], &FeerStub{}))
require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 2 && len(subChan2) == 2 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[0]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[0]}, event2)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event1)
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[2]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[2]}, event2)
// remove tx // remove tx
mp.Remove(txs[1].Hash(), fs) mp.Remove(txs[1].Hash(), fs)
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[1]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[1]}, event2)
// remove stale // remove stale
mp.RemoveStale(func(tx *transaction.Transaction) bool { mp.RemoveStale(func(tx *transaction.Transaction) bool {
@ -80,8 +81,8 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan1) == 1 && len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event1 = <-subChan1 event1 = <-subChan1
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event1) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event1)
require.Equal(t, Event{Type: TransactionRemoved, Tx: txs[2]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionRemoved, Tx: txs[2]}, event2)
// unsubscribe // unsubscribe
mp.UnsubscribeFromTransactions(subChan1) mp.UnsubscribeFromTransactions(subChan1)
@ -89,6 +90,6 @@ func TestSubscriptions(t *testing.T) {
require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100) require.Eventually(t, func() bool { return len(subChan2) == 1 }, time.Second, time.Millisecond*100)
event2 = <-subChan2 event2 = <-subChan2
require.Equal(t, 0, len(subChan1)) require.Equal(t, 0, len(subChan1))
require.Equal(t, Event{Type: TransactionAdded, Tx: txs[3]}, event2) require.Equal(t, mempoolevent.Event{Type: mempoolevent.TransactionAdded, Tx: txs[3]}, event2)
}) })
} }

View file

@ -0,0 +1,70 @@
package mempoolevent
import (
"encoding/json"
"errors"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
)
// Type represents mempool event type.
type Type byte
const (
// TransactionAdded marks transaction addition mempool event.
TransactionAdded Type = 0x01
// TransactionRemoved marks transaction removal mempool event.
TransactionRemoved Type = 0x02
)
// Event represents one of mempool events: transaction was added or removed from mempool.
type Event struct {
Type Type
Tx *transaction.Transaction
Data interface{}
}
// String is a Stringer implementation.
func (e Type) String() string {
switch e {
case TransactionAdded:
return "added"
case TransactionRemoved:
return "removed"
default:
return "unknown"
}
}
// GetEventTypeFromString converts input string into an Type if it's possible.
func GetEventTypeFromString(s string) (Type, error) {
switch s {
case "added":
return TransactionAdded, nil
case "removed":
return TransactionRemoved, nil
default:
return 0, errors.New("invalid event type name")
}
}
// MarshalJSON implements json.Marshaler interface.
func (e Type) MarshalJSON() ([]byte, error) {
return json.Marshal(e.String())
}
// UnmarshalJSON implements json.Unmarshaler interface.
func (e *Type) UnmarshalJSON(b []byte) error {
var s string
err := json.Unmarshal(b, &s)
if err != nil {
return err
}
id, err := GetEventTypeFromString(s)
if err != nil {
return err
}
*e = id
return nil
}

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -48,7 +49,7 @@ type (
mp *mempool.Pool mp *mempool.Pool
// requests channel // requests channel
reqCh chan mempool.Event reqCh chan mempoolevent.Event
blocksCh chan *block.Block blocksCh chan *block.Block
stopCh chan struct{} stopCh chan struct{}
} }
@ -109,7 +110,7 @@ func NewNotary(cfg Config, net netmode.Magic, mp *mempool.Pool, onTransaction fu
wallet: wallet, wallet: wallet,
onTransaction: onTransaction, onTransaction: onTransaction,
mp: mp, mp: mp,
reqCh: make(chan mempool.Event), reqCh: make(chan mempoolevent.Event),
blocksCh: make(chan *block.Block), blocksCh: make(chan *block.Block),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
}, nil }, nil
@ -129,9 +130,9 @@ func (n *Notary) Run() {
case event := <-n.reqCh: case event := <-n.reqCh:
if req, ok := event.Data.(*payload.P2PNotaryRequest); ok { if req, ok := event.Data.(*payload.P2PNotaryRequest); ok {
switch event.Type { switch event.Type {
case mempool.TransactionAdded: case mempoolevent.TransactionAdded:
n.OnNewRequest(req) n.OnNewRequest(req)
case mempool.TransactionRemoved: case mempoolevent.TransactionRemoved:
n.OnRequestRemoval(req) n.OnRequestRemoval(req)
} }
} }