subscriptions: move NotificationEvent to state

1. It's not good for pkg/core to import anything from pkg/neorpc.
2. The type is closely tied to the state package, even though it's not stored
   in the DB
This commit is contained in:
Roman Khimov 2022-07-22 21:17:23 +03:00
parent 8e70cd3596
commit 4acd1688a1
11 changed files with 76 additions and 99 deletions

View file

@ -19,7 +19,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
uatomic "go.uber.org/atomic"
@ -404,7 +403,7 @@ func (chain *FakeChain) SubscribeForExecutions(ch chan<- *state.AppExecResult) {
}
// SubscribeForNotifications implements the Blockchainer interface.
func (chain *FakeChain) SubscribeForNotifications(ch chan<- *subscriptions.NotificationEvent) {
func (chain *FakeChain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
panic("TODO")
}
@ -444,7 +443,7 @@ func (chain *FakeChain) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult
}
// UnsubscribeFromNotifications implements the Blockchainer interface.
func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *subscriptions.NotificationEvent) {
func (chain *FakeChain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
panic("TODO")
}

View file

@ -33,7 +33,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
@ -755,7 +754,7 @@ func (bc *Blockchain) notificationDispatcher() {
// expected, but maps are convenient for adding/deleting elements).
blockFeed = make(map[chan<- *block.Block]bool)
txFeed = make(map[chan<- *transaction.Transaction]bool)
notificationFeed = make(map[chan<- *subscriptions.NotificationEvent]bool)
notificationFeed = make(map[chan<- *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan<- *state.AppExecResult]bool)
)
for {
@ -768,7 +767,7 @@ func (bc *Blockchain) notificationDispatcher() {
blockFeed[ch] = true
case chan<- *transaction.Transaction:
txFeed[ch] = true
case chan<- *subscriptions.NotificationEvent:
case chan<- *state.ContainedNotificationEvent:
notificationFeed[ch] = true
case chan<- *state.AppExecResult:
executionFeed[ch] = true
@ -781,7 +780,7 @@ func (bc *Blockchain) notificationDispatcher() {
delete(blockFeed, ch)
case chan<- *transaction.Transaction:
delete(txFeed, ch)
case chan<- *subscriptions.NotificationEvent:
case chan<- *state.ContainedNotificationEvent:
delete(notificationFeed, ch)
case chan<- *state.AppExecResult:
delete(executionFeed, ch)
@ -801,7 +800,7 @@ func (bc *Blockchain) notificationDispatcher() {
}
for i := range aer.Events {
for ch := range notificationFeed {
ch <- &subscriptions.NotificationEvent{
ch <- &state.ContainedNotificationEvent{
Container: aer.Container,
NotificationEvent: aer.Events[i],
}
@ -821,7 +820,7 @@ func (bc *Blockchain) notificationDispatcher() {
if aer.VMState == vmstate.Halt {
for i := range aer.Events {
for ch := range notificationFeed {
ch <- &subscriptions.NotificationEvent{
ch <- &state.ContainedNotificationEvent{
Container: aer.Container,
NotificationEvent: aer.Events[i],
}
@ -842,7 +841,7 @@ func (bc *Blockchain) notificationDispatcher() {
}
for i := range aer.Events {
for ch := range notificationFeed {
ch <- &subscriptions.NotificationEvent{
ch <- &state.ContainedNotificationEvent{
Container: aer.Container,
NotificationEvent: aer.Events[i],
}
@ -1782,7 +1781,7 @@ func (bc *Blockchain) SubscribeForTransactions(ch chan<- *transaction.Transactio
// transactions use SubscribeForExecutions instead. Make sure this channel is
// read from regularly as not reading these events might affect other Blockchain
// functions.
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *subscriptions.NotificationEvent) {
func (bc *Blockchain) SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.subCh <- ch
}
@ -1810,7 +1809,7 @@ func (bc *Blockchain) UnsubscribeFromTransactions(ch chan<- *transaction.Transac
// UnsubscribeFromNotifications unsubscribes given channel from new
// execution-generated notifications, you can close it afterwards. Passing
// non-subscribed channel is a no-op.
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *subscriptions.NotificationEvent) {
func (bc *Blockchain) UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent) {
bc.unsubCh <- ch
}

View file

@ -36,7 +36,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/neotest/chain"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -760,7 +759,7 @@ func TestBlockchain_Subscriptions(t *testing.T) {
const chBufSize = 16
blockCh := make(chan *block.Block, chBufSize)
txCh := make(chan *transaction.Transaction, chBufSize)
notificationCh := make(chan *subscriptions.NotificationEvent, chBufSize)
notificationCh := make(chan *state.ContainedNotificationEvent, chBufSize)
executionCh := make(chan *state.AppExecResult, chBufSize)
bc, acc := chain.NewSingle(t)

View file

@ -12,7 +12,6 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result/subscriptions"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
)
@ -70,14 +69,14 @@ type Blockchainer interface {
SetNotary(mod services.Notary)
SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *subscriptions.NotificationEvent)
SubscribeForNotifications(ch chan<- *state.ContainedNotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction) error
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
GetMemPool() *mempool.Pool
UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)
UnsubscribeFromNotifications(ch chan<- *subscriptions.NotificationEvent)
UnsubscribeFromNotifications(ch chan<- *state.ContainedNotificationEvent)
UnsubscribeFromTransactions(ch chan<- *transaction.Transaction)
// Policer.
GetBaseExecFee() int64

View file

@ -27,6 +27,13 @@ type AppExecResult struct {
Execution
}
// ContainedNotificationEvent represents a wrapper for a notification from script execution.
type ContainedNotificationEvent struct {
// Container hash is the hash of script container which is either a block or a transaction.
Container util.Uint256
NotificationEvent
}
// EncodeBinary implements the Serializable interface.
func (ne *NotificationEvent) EncodeBinary(w *io.BinWriter) {
ne.EncodeBinaryWithContext(w, stackitem.NewSerializationContext())
@ -276,3 +283,42 @@ func (e *Execution) UnmarshalJSON(data []byte) error {
e.FaultException = aux.FaultException
return nil
}
// containedNotificationEventAux is an auxiliary struct for JSON marshalling.
type containedNotificationEventAux struct {
Container util.Uint256 `json:"container"`
}
// MarshalJSON implements the json.Marshaler interface.
func (ne *ContainedNotificationEvent) MarshalJSON() ([]byte, error) {
h, err := json.Marshal(&containedNotificationEventAux{
Container: ne.Container,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal hash: %w", err)
}
exec, err := json.Marshal(ne.NotificationEvent)
if err != nil {
return nil, fmt.Errorf("failed to marshal execution: %w", err)
}
if h[len(h)-1] != '}' || exec[0] != '{' {
return nil, errors.New("can't merge internal jsons")
}
h[len(h)-1] = ','
h = append(h, exec[1:]...)
return h, nil
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (ne *ContainedNotificationEvent) UnmarshalJSON(data []byte) error {
aux := new(containedNotificationEventAux)
if err := json.Unmarshal(data, aux); err != nil {
return err
}
if err := json.Unmarshal(data, &ne.NotificationEvent); err != nil {
return err
}
ne.Container = aux.Container
return nil
}

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neo-go/internal/testserdes"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/nspcc-dev/neo-go/pkg/vm/vmstate"
"github.com/stretchr/testify/require"
@ -241,3 +242,14 @@ func TestMarshalUnmarshalJSONAppExecResult(t *testing.T) {
}
})
}
func TestContainedNotificationEvent_MarshalUnmarshalJSON(t *testing.T) {
testserdes.MarshalUnmarshalJSON(t, &ContainedNotificationEvent{
Container: util.Uint256{1, 2, 3},
NotificationEvent: NotificationEvent{
ScriptHash: util.Uint160{4, 5, 6},
Name: "alarm",
Item: stackitem.NewArray([]stackitem.Item{stackitem.NewByteArray([]byte("qwerty"))}),
},
}, new(ContainedNotificationEvent))
}

View file

@ -1,56 +0,0 @@
package subscriptions
import (
"encoding/json"
"errors"
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
)
// NotificationEvent represents a wrapper for a notification from script execution.
type NotificationEvent struct {
// Container hash is the hash of script container which is either a block or a transaction.
Container util.Uint256
state.NotificationEvent
}
// notificationEventAux is an auxiliary struct for JSON marshalling.
type notificationEventAux struct {
Container util.Uint256 `json:"container"`
}
// MarshalJSON implements the json.Marshaler interface.
func (ne *NotificationEvent) MarshalJSON() ([]byte, error) {
h, err := json.Marshal(&notificationEventAux{
Container: ne.Container,
})
if err != nil {
return nil, fmt.Errorf("failed to marshal hash: %w", err)
}
exec, err := json.Marshal(ne.NotificationEvent)
if err != nil {
return nil, fmt.Errorf("failed to marshal execution: %w", err)
}
if h[len(h)-1] != '}' || exec[0] != '{' {
return nil, errors.New("can't merge internal jsons")
}
h[len(h)-1] = ','
h = append(h, exec[1:]...)
return h, nil
}
// UnmarshalJSON implements the json.Unmarshaler interface.
func (ne *NotificationEvent) UnmarshalJSON(data []byte) error {
aux := new(notificationEventAux)
if err := json.Unmarshal(data, aux); err != nil {
return err
}
if err := json.Unmarshal(data, &ne.NotificationEvent); err != nil {
return err
}
ne.Container = aux.Container
return nil
}

View file

@ -1,21 +0,0 @@
package subscriptions
import (
"testing"
"github.com/nspcc-dev/neo-go/internal/testserdes"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
)
func TestNotificationEvent_MarshalUnmarshalJSON(t *testing.T) {
testserdes.MarshalUnmarshalJSON(t, &NotificationEvent{
Container: util.Uint256{1, 2, 3},
NotificationEvent: state.NotificationEvent{
ScriptHash: util.Uint160{4, 5, 6},
Name: "alarm",
Item: stackitem.NewArray([]stackitem.Item{stackitem.NewByteArray([]byte("qwerty"))}),
},
}, new(NotificationEvent))
}

View file

@ -52,7 +52,7 @@ type WSClient struct {
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of block.Block, state.AppExecResult, subscriptions.NotificationEvent
// Value can be one of block.Block, state.AppExecResult, state.ContainedNotificationEvent
// transaction.Transaction or subscriptions.NotaryRequestEvent based on Type.
type Notification struct {
Type neorpc.EventID
@ -182,7 +182,7 @@ readloop:
case neorpc.TransactionEventID:
val = &transaction.Transaction{}
case neorpc.NotificationEventID:
val = new(subscriptions.NotificationEvent)
val = new(state.ContainedNotificationEvent)
case neorpc.ExecutionEventID:
val = new(state.AppExecResult)
case neorpc.NotaryRequestEventID:

View file

@ -87,7 +87,7 @@ type (
notaryRequestSubs int
blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *subscriptions.NotificationEvent
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
}
@ -245,7 +245,7 @@ func New(chain blockchainer.Blockchainer, conf config.RPC, coreServer *network.S
// These are NOT buffered to preserve original order of events.
blockCh: make(chan *block.Block),
executionCh: make(chan *state.AppExecResult),
notificationCh: make(chan *subscriptions.NotificationEvent),
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
}

View file

@ -72,7 +72,7 @@ func (f *feed) Matches(r *neorpc.Notification) bool {
return senderOK && signerOK
case neorpc.NotificationEventID:
filt := f.filter.(neorpc.NotificationFilter)
notification := r.Payload[0].(*subscriptions.NotificationEvent)
notification := r.Payload[0].(*state.ContainedNotificationEvent)
hashOk := filt.Contract == nil || notification.ScriptHash.Equals(*filt.Contract)
nameOk := filt.Name == nil || notification.Name == *filt.Name
return hashOk && nameOk