From dc26a09ec3a251469cc6e7eee92c75c42a8682a9 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 8 Sep 2021 18:20:02 +0300 Subject: [PATCH] [#812] pkg/innerring: Support notary notifications in reputation processor Signed-off-by: Alex Vanin --- pkg/innerring/innerring.go | 1 + .../processors/reputation/handlers.go | 8 +--- .../processors/reputation/process_put.go | 45 +++++++++++++------ .../processors/reputation/processor.go | 35 +++++++++++++-- 4 files changed, 65 insertions(+), 24 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 060d694a6..7ee8f4356 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -777,6 +777,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error NetMapSource: server.netmapClient, }, ), + NotaryDisabled: server.sideNotaryConfig.disabled, }) if err != nil { return nil, err diff --git a/pkg/innerring/processors/reputation/handlers.go b/pkg/innerring/processors/reputation/handlers.go index 26cfa4712..728176073 100644 --- a/pkg/innerring/processors/reputation/handlers.go +++ b/pkg/innerring/processors/reputation/handlers.go @@ -19,13 +19,7 @@ func (rp *Processor) handlePutReputation(ev event.Event) { // send event to the worker pool - err := rp.pool.Submit(func() { - rp.processPut( - put.Epoch(), - peerID, - put.Value(), - ) - }) + err := rp.pool.Submit(func() { rp.processPut(&put) }) if err != nil { // there system can be moved into controlled degradation stage rp.log.Warn("reputation worker pool drained", diff --git a/pkg/innerring/processors/reputation/process_put.go b/pkg/innerring/processors/reputation/process_put.go index 6a0fd28e7..576cbdab1 100644 --- a/pkg/innerring/processors/reputation/process_put.go +++ b/pkg/innerring/processors/reputation/process_put.go @@ -8,18 +8,23 @@ import ( apireputation "github.com/nspcc-dev/neofs-api-go/pkg/reputation" "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" + reputationEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/reputation" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" "go.uber.org/zap" ) var errWrongManager = errors.New("got manager that is incorrect for peer") -func (rp *Processor) processPut(epoch uint64, id apireputation.PeerID, value apireputation.GlobalTrust) { +func (rp *Processor) processPut(e *reputationEvent.Put) { if !rp.alphabetState.IsAlphabet() { rp.log.Info("non alphabet mode, ignore reputation put notification") return } + epoch := e.Epoch() + id := e.PeerID() + value := e.Value() + // check if epoch is valid currentEpoch := rp.epochState.EpochCounter() if epoch >= currentEpoch { @@ -49,18 +54,7 @@ func (rp *Processor) processPut(epoch uint64, id apireputation.PeerID, value api return } - args := wrapper.PutArgs{} - args.SetEpoch(epoch) - args.SetPeerID(id) - args.SetValue(value) - - err := rp.reputationWrp.Put(args) - if err != nil { - // FIXME: do not use `ToV2` method outside neofs-api-go library - rp.log.Warn("can't send approval tx for reputation value", - zap.String("peer_id", hex.EncodeToString(id.ToV2().GetPublicKey())), - zap.String("error", err.Error())) - } + rp.approvePutReputation(e) } func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apireputation.PeerID) error { @@ -78,3 +72,28 @@ func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apir return errWrongManager } + +func (rp *Processor) approvePutReputation(e *reputationEvent.Put) { + var ( + id = e.PeerID() + err error + ) + + if nr := e.NotaryRequest(); nr != nil { + // put event was received via Notary service + err = rp.reputationWrp.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + } else { + args := wrapper.PutArgs{} + args.SetEpoch(e.Epoch()) + args.SetPeerID(id) + args.SetValue(e.Value()) + + err = rp.reputationWrp.Put(args) + } + if err != nil { + // FIXME: do not use `ToV2` method outside neofs-api-go library + rp.log.Warn("can't send approval tx for reputation value", + zap.String("peer_id", hex.EncodeToString(id.ToV2().GetPublicKey())), + zap.String("error", err.Error())) + } +} diff --git a/pkg/innerring/processors/reputation/processor.go b/pkg/innerring/processors/reputation/processor.go index 25ac2162e..c312bb598 100644 --- a/pkg/innerring/processors/reputation/processor.go +++ b/pkg/innerring/processors/reputation/processor.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/util" reputationWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" @@ -37,6 +38,8 @@ type ( reputationWrp *reputationWrapper.ClientWrapper mngBuilder common.ManagerBuilder + + notaryDisabled bool } // Params of the processor constructor. @@ -48,6 +51,7 @@ type ( AlphabetState AlphabetState ReputationWrapper *reputationWrapper.ClientWrapper ManagerBuilder common.ManagerBuilder + NotaryDisabled bool } ) @@ -85,11 +89,16 @@ func New(p *Params) (*Processor, error) { alphabetState: p.AlphabetState, reputationWrp: p.ReputationWrapper, mngBuilder: p.ManagerBuilder, + notaryDisabled: p.NotaryDisabled, }, nil } // ListenerNotificationParsers for the 'event.Listener' event producer. func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo { + if !rp.notaryDisabled { + return nil + } + var parsers []event.NotificationParserInfo // put reputation event @@ -104,6 +113,10 @@ func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInf // ListenerNotificationHandlers for the 'event.Listener' event producer. func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo { + if !rp.notaryDisabled { + return nil + } + var handlers []event.NotificationHandlerInfo // put reputation handler @@ -116,14 +129,28 @@ func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI return handlers } -// ListenerNotaryParsers for the 'event.Listener' event producer. +// ListenerNotaryParsers for the 'event.Listener' notary event producer. func (rp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo { - return nil + var p event.NotaryParserInfo + + p.SetMempoolType(mempoolevent.TransactionAdded) + p.SetRequestType(reputationEvent.PutNotaryEvent) + p.SetScriptHash(rp.reputationContract) + p.SetParser(reputationEvent.ParsePutNotary) + + return []event.NotaryParserInfo{p} } -// ListenerNotaryHandlers for the 'event.Listener' event producer. +// ListenerNotaryHandlers for the 'event.Listener' notary event producer. func (rp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo { - return nil + var h event.NotaryHandlerInfo + + h.SetMempoolType(mempoolevent.TransactionAdded) + h.SetRequestType(reputationEvent.PutNotaryEvent) + h.SetScriptHash(rp.reputationContract) + h.SetHandler(rp.handlePutReputation) + + return []event.NotaryHandlerInfo{h} } // TimersHandlers for the 'Timers' event producer.