forked from TrueCloudLab/frostfs-node
[#770] pkg/morph/event: Add notary notifications support
Add handlers and parsers functionality for listener. Separate notification and notary events by files. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
8f2924d6cf
commit
c042f6a429
6 changed files with 376 additions and 29 deletions
|
@ -28,3 +28,22 @@ func (s *NotificationHandlerInfo) SetHandler(v Handler) {
|
|||
func (s NotificationHandlerInfo) Handler() Handler {
|
||||
return s.h
|
||||
}
|
||||
|
||||
// NotaryHandlerInfo is a structure that groups
|
||||
// the parameters of the handler of particular
|
||||
// notary event.
|
||||
type NotaryHandlerInfo struct {
|
||||
notaryRequestTypes
|
||||
|
||||
h Handler
|
||||
}
|
||||
|
||||
// SetHandler is an event handler setter.
|
||||
func (nhi *NotaryHandlerInfo) SetHandler(v Handler) {
|
||||
nhi.h = v
|
||||
}
|
||||
|
||||
// Handler returns an event handler.
|
||||
func (nhi NotaryHandlerInfo) Handler() Handler {
|
||||
return nhi.h
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||
"github.com/nspcc-dev/neo-go/pkg/rpc/response"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
|
||||
|
@ -43,6 +44,31 @@ type Listener interface {
|
|||
// Must ignore nil handlers.
|
||||
RegisterNotificationHandler(NotificationHandlerInfo)
|
||||
|
||||
// EnableNotarySupport enables notary request listening. Passed hash is
|
||||
// notary mainTX signer. In practise, it means that listener will subscribe
|
||||
// for only notary requests that are going to be paid with passed hash.
|
||||
//
|
||||
// Must not be called after Listen or ListenWithError.
|
||||
EnableNotarySupport(util.Uint160, client.AlphabetKeys, BlockCounter)
|
||||
|
||||
// SetNotaryParser must set the parser of particular notary request event.
|
||||
//
|
||||
// Parser of each event must be set once. All parsers must be set before Listen call.
|
||||
//
|
||||
// Must ignore nil parsers and all calls after listener has been started.
|
||||
//
|
||||
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
|
||||
SetNotaryParser(NotaryParserInfo)
|
||||
|
||||
// RegisterNotaryHandler must register the event handler for particular notification event of contract.
|
||||
//
|
||||
// The specified handler must be called after each capture and parsing of the event.
|
||||
//
|
||||
// Must ignore nil handlers.
|
||||
//
|
||||
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
|
||||
RegisterNotaryHandler(NotaryHandlerInfo)
|
||||
|
||||
// RegisterBlockHandler must register chain block handler.
|
||||
//
|
||||
// The specified handler must be called after each capture and parsing of the new block from chain.
|
||||
|
@ -72,6 +98,12 @@ type listener struct {
|
|||
notificationParsers map[scriptHashWithType]NotificationParser
|
||||
notificationHandlers map[scriptHashWithType][]Handler
|
||||
|
||||
listenNotary bool
|
||||
notaryEventsPreparator NotaryPreparator
|
||||
notaryParsers map[notaryRequestTypes]NotaryParser
|
||||
notaryHandlers map[notaryRequestTypes]Handler
|
||||
notaryMainTXSigner util.Uint160 // filter for notary subscription
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
subscriber subscriber.Subscriber
|
||||
|
@ -119,7 +151,7 @@ func (l listener) ListenWithError(ctx context.Context, intError chan<- error) {
|
|||
})
|
||||
}
|
||||
|
||||
func (l listener) listen(ctx context.Context, intError chan<- error) error {
|
||||
func (l *listener) listen(ctx context.Context, intError chan<- error) error {
|
||||
// create the list of listening contract hashes
|
||||
hashes := make([]util.Uint160, 0)
|
||||
|
||||
|
@ -154,10 +186,15 @@ func (l listener) listen(ctx context.Context, intError chan<- error) error {
|
|||
}
|
||||
|
||||
func (l listener) listenLoop(ctx context.Context, chEvent <-chan *state.NotificationEvent, intErr chan<- error) {
|
||||
var blockChan <-chan *block.Block
|
||||
var (
|
||||
blockChan <-chan *block.Block
|
||||
|
||||
notaryChan <-chan *response.NotaryRequestEvent
|
||||
|
||||
err error
|
||||
)
|
||||
|
||||
if len(l.blockHandlers) > 0 {
|
||||
var err error
|
||||
if blockChan, err = l.subscriber.BlockNotifications(); err != nil {
|
||||
if intErr != nil {
|
||||
intErr <- fmt.Errorf("could not open block notifications channel: %w", err)
|
||||
|
@ -173,6 +210,20 @@ func (l listener) listenLoop(ctx context.Context, chEvent <-chan *state.Notifica
|
|||
blockChan = make(chan *block.Block)
|
||||
}
|
||||
|
||||
if l.listenNotary {
|
||||
if notaryChan, err = l.subscriber.SubscribeForNotaryRequests(l.notaryMainTXSigner); err != nil {
|
||||
if intErr != nil {
|
||||
intErr <- fmt.Errorf("could not open notary notifications channel: %w", err)
|
||||
} else {
|
||||
l.log.Debug("could not open notary notifications channel",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
|
@ -183,7 +234,7 @@ loop:
|
|||
break loop
|
||||
case notifyEvent, ok := <-chEvent:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by channel")
|
||||
l.log.Warn("stop event listener by notification channel")
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("event subscriber connection has been terminated")
|
||||
}
|
||||
|
@ -195,6 +246,20 @@ loop:
|
|||
}
|
||||
|
||||
l.parseAndHandleNotification(notifyEvent)
|
||||
case notaryEvent, ok := <-notaryChan:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by notary channel")
|
||||
if intErr != nil {
|
||||
intErr <- errors.New("notary event subscriber connection has been terminated")
|
||||
}
|
||||
|
||||
break loop
|
||||
} else if notaryEvent == nil {
|
||||
l.log.Warn("nil notary event was caught")
|
||||
continue loop
|
||||
}
|
||||
|
||||
l.parseAndHandleNotary(notaryEvent)
|
||||
case b, ok := <-blockChan:
|
||||
if !ok {
|
||||
l.log.Warn("stop event listener by block channel")
|
||||
|
@ -284,17 +349,78 @@ func (l listener) parseAndHandleNotification(notifyEvent *state.NotificationEven
|
|||
}
|
||||
}
|
||||
|
||||
func (l listener) parseAndHandleNotary(nr *response.NotaryRequestEvent) {
|
||||
// prepare the notary event
|
||||
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.Is(err, ErrTXAlreadyHandled):
|
||||
case errors.Is(err, ErrMainTXExpired):
|
||||
l.log.Warn("skip expired main TX notary event",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
default:
|
||||
l.log.Warn("could not prepare and validate notary event",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
notaryKey := notaryRequestTypes{}
|
||||
notaryKey.SetMempoolType(nr.Type)
|
||||
notaryKey.SetRequestType(notaryEvent.Type())
|
||||
notaryKey.SetScriptHash(notaryEvent.ScriptHash())
|
||||
|
||||
// get notary parser
|
||||
l.mtx.RLock()
|
||||
parser, ok := l.notaryParsers[notaryKey]
|
||||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
l.log.Debug("notary parser not set")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// parse the notary event
|
||||
event, err := parser(notaryEvent)
|
||||
if err != nil {
|
||||
l.log.Warn("could not parse notary event",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// handle the event
|
||||
l.mtx.RLock()
|
||||
handler, ok := l.notaryHandlers[notaryKey]
|
||||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
l.log.Info("notary handlers for parsed notification event were not registered",
|
||||
zap.Any("event", event),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
handler(event)
|
||||
}
|
||||
|
||||
// SetNotificationParser sets the parser of particular contract event.
|
||||
//
|
||||
// Ignores nil and already set parsers.
|
||||
// Ignores the parser if listener is started.
|
||||
func (l listener) SetNotificationParser(p NotificationParserInfo) {
|
||||
func (l listener) SetNotificationParser(pi NotificationParserInfo) {
|
||||
log := l.log.With(
|
||||
zap.String("script hash LE", p.ScriptHash().StringLE()),
|
||||
zap.Stringer("event type", p.getType()),
|
||||
zap.String("script hash LE", pi.ScriptHash().StringLE()),
|
||||
zap.Stringer("event type", pi.getType()),
|
||||
)
|
||||
|
||||
parser := p.parser()
|
||||
parser := pi.parser()
|
||||
if parser == nil {
|
||||
log.Info("ignore nil event parser")
|
||||
return
|
||||
|
@ -310,8 +436,8 @@ func (l listener) SetNotificationParser(p NotificationParserInfo) {
|
|||
}
|
||||
|
||||
// add event parser
|
||||
if _, ok := l.notificationParsers[p.scriptHashWithType]; !ok {
|
||||
l.notificationParsers[p.scriptHashWithType] = p.parser()
|
||||
if _, ok := l.notificationParsers[pi.scriptHashWithType]; !ok {
|
||||
l.notificationParsers[pi.scriptHashWithType] = pi.parser()
|
||||
}
|
||||
|
||||
log.Info("registered new event parser")
|
||||
|
@ -321,13 +447,13 @@ func (l listener) SetNotificationParser(p NotificationParserInfo) {
|
|||
//
|
||||
// Ignores nil handlers.
|
||||
// Ignores handlers of event without parser.
|
||||
func (l listener) RegisterNotificationHandler(p NotificationHandlerInfo) {
|
||||
func (l listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
|
||||
log := l.log.With(
|
||||
zap.String("script hash LE", p.ScriptHash().StringLE()),
|
||||
zap.Stringer("event type", p.GetType()),
|
||||
zap.String("script hash LE", hi.ScriptHash().StringLE()),
|
||||
zap.Stringer("event type", hi.GetType()),
|
||||
)
|
||||
|
||||
handler := p.Handler()
|
||||
handler := hi.Handler()
|
||||
if handler == nil {
|
||||
log.Warn("ignore nil event handler")
|
||||
return
|
||||
|
@ -335,7 +461,7 @@ func (l listener) RegisterNotificationHandler(p NotificationHandlerInfo) {
|
|||
|
||||
// check if parser was set
|
||||
l.mtx.RLock()
|
||||
_, ok := l.notificationParsers[p.scriptHashWithType]
|
||||
_, ok := l.notificationParsers[hi.scriptHashWithType]
|
||||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
|
@ -345,15 +471,113 @@ func (l listener) RegisterNotificationHandler(p NotificationHandlerInfo) {
|
|||
|
||||
// add event handler
|
||||
l.mtx.Lock()
|
||||
l.notificationHandlers[p.scriptHashWithType] = append(
|
||||
l.notificationHandlers[p.scriptHashWithType],
|
||||
p.Handler(),
|
||||
l.notificationHandlers[hi.scriptHashWithType] = append(
|
||||
l.notificationHandlers[hi.scriptHashWithType],
|
||||
hi.Handler(),
|
||||
)
|
||||
l.mtx.Unlock()
|
||||
|
||||
log.Info("registered new event handler")
|
||||
}
|
||||
|
||||
// EnableNotarySupport enables notary request listening. Passed hash is
|
||||
// notary mainTX signer. In practise, it means that listener will subscribe
|
||||
// for only notary requests that are going to be paid with passed hash.
|
||||
//
|
||||
// Must not be called after Listen or ListenWithError.
|
||||
func (l *listener) EnableNotarySupport(mainTXSigner util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter) {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
|
||||
l.listenNotary = true
|
||||
l.notaryMainTXSigner = mainTXSigner
|
||||
l.notaryHandlers = make(map[notaryRequestTypes]Handler)
|
||||
l.notaryParsers = make(map[notaryRequestTypes]NotaryParser)
|
||||
l.notaryEventsPreparator = notaryPreparator(
|
||||
PreparatorPrm{
|
||||
AlphaKeys: alphaKeys,
|
||||
BlockCounter: bc,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// SetNotaryParser sets the parser of particular notary request event.
|
||||
//
|
||||
// Ignores nil and already set parsers.
|
||||
// Ignores the parser if listener is started.
|
||||
func (l listener) SetNotaryParser(pi NotaryParserInfo) {
|
||||
if !l.listenNotary {
|
||||
return
|
||||
}
|
||||
|
||||
log := l.log.With(
|
||||
zap.Stringer("mempool_type", pi.GetMempoolType()),
|
||||
zap.Stringer("script_hash", pi.ScriptHash()),
|
||||
zap.Stringer("notary_type", pi.RequestType()),
|
||||
)
|
||||
|
||||
parser := pi.parser()
|
||||
if parser == nil {
|
||||
log.Info("ignore nil notary event parser")
|
||||
return
|
||||
}
|
||||
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
|
||||
// check if the listener was started
|
||||
if l.started {
|
||||
log.Warn("listener has been already started, ignore notary parser")
|
||||
return
|
||||
}
|
||||
|
||||
// add event parser
|
||||
if _, ok := l.notaryParsers[pi.notaryRequestTypes]; !ok {
|
||||
l.notaryParsers[pi.notaryRequestTypes] = pi.parser()
|
||||
}
|
||||
|
||||
log.Info("registered new event parser")
|
||||
}
|
||||
|
||||
// RegisterNotaryHandler registers the handler for particular notification notary request event.
|
||||
//
|
||||
// Ignores nil handlers.
|
||||
// Ignores handlers of event without parser.
|
||||
func (l listener) RegisterNotaryHandler(hi NotaryHandlerInfo) {
|
||||
if !l.listenNotary {
|
||||
return
|
||||
}
|
||||
|
||||
log := l.log.With(
|
||||
zap.Stringer("mempool type", hi.GetMempoolType()),
|
||||
zap.Stringer("script_hash", hi.ScriptHash()),
|
||||
zap.Stringer("notary type", hi.RequestType()),
|
||||
)
|
||||
|
||||
handler := hi.Handler()
|
||||
if handler == nil {
|
||||
log.Warn("ignore nil notary event handler")
|
||||
return
|
||||
}
|
||||
|
||||
// check if parser was set
|
||||
l.mtx.RLock()
|
||||
_, ok := l.notaryParsers[hi.notaryRequestTypes]
|
||||
l.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
log.Warn("ignore handler of notary event w/o parser")
|
||||
return
|
||||
}
|
||||
|
||||
// add notary event handler
|
||||
l.mtx.Lock()
|
||||
l.notaryHandlers[hi.notaryRequestTypes] = hi.Handler()
|
||||
l.mtx.Unlock()
|
||||
|
||||
log.Info("registered new event handler")
|
||||
}
|
||||
|
||||
// Stop closes subscription channel with remote neo node.
|
||||
func (l listener) Stop() {
|
||||
l.subscriber.Close()
|
||||
|
|
41
pkg/morph/event/notary.go
Normal file
41
pkg/morph/event/notary.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
)
|
||||
|
||||
// NotaryType is a notary event enumeration type.
|
||||
type NotaryType string
|
||||
|
||||
// NotaryEvent is an interface that is
|
||||
// provided by Neo:Morph notary event
|
||||
// structures.
|
||||
type NotaryEvent interface {
|
||||
ScriptHash() util.Uint160
|
||||
Type() NotaryType
|
||||
Params() []Op
|
||||
|
||||
Raw() *payload.P2PNotaryRequest
|
||||
}
|
||||
|
||||
// Equal compares two NotaryType values and
|
||||
// returns true if they are equal.
|
||||
func (t NotaryType) Equal(t2 NotaryType) bool {
|
||||
return string(t) == string(t2)
|
||||
}
|
||||
|
||||
// String returns casted to string NotaryType.
|
||||
func (t NotaryType) String() string {
|
||||
return string(t)
|
||||
}
|
||||
|
||||
// NotaryTypeFromBytes converts bytes slice to NotaryType.
|
||||
func NotaryTypeFromBytes(data []byte) NotaryType {
|
||||
return NotaryType(data)
|
||||
}
|
||||
|
||||
// NotaryTypeFromString converts string to NotaryType.
|
||||
func NotaryTypeFromString(str string) NotaryType {
|
||||
return NotaryType(str)
|
||||
}
|
|
@ -3,6 +3,7 @@ package event
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||
)
|
||||
|
||||
|
@ -19,20 +20,31 @@ type NotificationParserInfo struct {
|
|||
p NotificationParser
|
||||
}
|
||||
|
||||
type wrongPrmNumber struct {
|
||||
exp, act int
|
||||
// NotaryPreparator constructs NotaryEvent
|
||||
// from the NotaryRequest event.
|
||||
type NotaryPreparator interface {
|
||||
Prepare(*payload.P2PNotaryRequest) (NotaryEvent, error)
|
||||
}
|
||||
|
||||
// WrongNumberOfParameters returns an error about wrong number of smart contract parameters.
|
||||
func WrongNumberOfParameters(exp, act int) error {
|
||||
return &wrongPrmNumber{
|
||||
exp: exp,
|
||||
act: act,
|
||||
}
|
||||
// NotaryParser is a function that constructs Event
|
||||
// from the NotaryEvent event.
|
||||
type NotaryParser func(NotaryEvent) (Event, error)
|
||||
|
||||
// NotaryParserInfo is a structure that groups
|
||||
// the parameters of particular notary request
|
||||
// event parser.
|
||||
type NotaryParserInfo struct {
|
||||
notaryRequestTypes
|
||||
|
||||
p NotaryParser
|
||||
}
|
||||
|
||||
func (s wrongPrmNumber) Error() string {
|
||||
return fmt.Errorf("wrong parameter count: expected %d, has %d", s.exp, s.act).Error()
|
||||
func (n *NotaryParserInfo) parser() NotaryParser {
|
||||
return n.p
|
||||
}
|
||||
|
||||
func (n *NotaryParserInfo) SetParser(p NotaryParser) {
|
||||
n.p = p
|
||||
}
|
||||
|
||||
// SetParser is an event parser setter.
|
||||
|
@ -52,3 +64,19 @@ func (s *NotificationParserInfo) SetType(v Type) {
|
|||
func (s NotificationParserInfo) getType() Type {
|
||||
return s.typ
|
||||
}
|
||||
|
||||
type wrongPrmNumber struct {
|
||||
exp, act int
|
||||
}
|
||||
|
||||
// WrongNumberOfParameters returns an error about wrong number of smart contract parameters.
|
||||
func WrongNumberOfParameters(exp, act int) error {
|
||||
return &wrongPrmNumber{
|
||||
exp: exp,
|
||||
act: act,
|
||||
}
|
||||
}
|
||||
|
||||
func (s wrongPrmNumber) Error() string {
|
||||
return fmt.Errorf("wrong parameter count: expected %d, has %d", s.exp, s.act).Error()
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
|
@ -19,12 +20,46 @@ type scriptHashWithType struct {
|
|||
typeValue
|
||||
}
|
||||
|
||||
type notaryRequestTypes struct {
|
||||
notaryRequestMempoolType
|
||||
notaryRequestType
|
||||
scriptHashValue
|
||||
}
|
||||
|
||||
type notaryRequestMempoolType struct {
|
||||
mempoolTyp mempoolevent.Type
|
||||
}
|
||||
|
||||
type notaryRequestType struct {
|
||||
notaryType NotaryType
|
||||
}
|
||||
|
||||
// GetMempoolType is a notary request mempool type getter.
|
||||
func (n notaryRequestMempoolType) GetMempoolType() mempoolevent.Type {
|
||||
return n.mempoolTyp
|
||||
}
|
||||
|
||||
// SetMempoolType is a notary request mempool type setter.
|
||||
func (n *notaryRequestMempoolType) SetMempoolType(typ mempoolevent.Type) {
|
||||
n.mempoolTyp = typ
|
||||
}
|
||||
|
||||
// RequestType is a notary request type getter.
|
||||
func (n notaryRequestType) RequestType() NotaryType {
|
||||
return n.notaryType
|
||||
}
|
||||
|
||||
// SetRequestType is a notary request type setter.
|
||||
func (n *notaryRequestType) SetRequestType(typ NotaryType) {
|
||||
n.notaryType = typ
|
||||
}
|
||||
|
||||
// SetScriptHash is a script hash setter.
|
||||
func (s *scriptHashValue) SetScriptHash(v util.Uint160) {
|
||||
s.hash = v
|
||||
}
|
||||
|
||||
// ScriptHash is script hash getter.
|
||||
// ScriptHash is a script hash getter.
|
||||
func (s scriptHashValue) ScriptHash() util.Uint160 {
|
||||
return s.hash
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue