From 09e4479d447237865efef081421dfab3930567c5 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 31 Mar 2021 18:38:01 +0300 Subject: [PATCH] [#452] innerring: Add reputation processor Signed-off-by: Alex Vanin --- .../processors/reputation/handlers.go | 31 +++++ .../processors/reputation/process_put.go | 30 +++++ .../processors/reputation/processor.go | 113 ++++++++++++++++++ 3 files changed, 174 insertions(+) create mode 100644 pkg/innerring/processors/reputation/handlers.go create mode 100644 pkg/innerring/processors/reputation/process_put.go create mode 100644 pkg/innerring/processors/reputation/processor.go diff --git a/pkg/innerring/processors/reputation/handlers.go b/pkg/innerring/processors/reputation/handlers.go new file mode 100644 index 000000000..28b541254 --- /dev/null +++ b/pkg/innerring/processors/reputation/handlers.go @@ -0,0 +1,31 @@ +package reputation + +import ( + "encoding/hex" + + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + reputationEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/reputation" + "go.uber.org/zap" +) + +func (rp *Processor) handlePutReputation(ev event.Event) { + put := ev.(reputationEvent.Put) + rp.log.Info("notification", + zap.String("type", "reputation put"), + zap.String("peer_id", hex.EncodeToString(put.PeerID().Bytes()))) + + // send event to the worker pool + + err := rp.pool.Submit(func() { + rp.processPut( + put.Epoch(), + put.PeerID(), + put.Value(), + ) + }) + if err != nil { + // there system can be moved into controlled degradation stage + rp.log.Warn("reputation worker pool drained", + zap.Int("capacity", rp.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/reputation/process_put.go b/pkg/innerring/processors/reputation/process_put.go new file mode 100644 index 000000000..64426d0e0 --- /dev/null +++ b/pkg/innerring/processors/reputation/process_put.go @@ -0,0 +1,30 @@ +package reputation + +import ( + "encoding/hex" + + "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "go.uber.org/zap" +) + +func (rp *Processor) processPut(epoch uint64, id reputation.PeerID, value []byte) { + if !rp.alphabetState.IsAlphabet() { + rp.log.Info("non alphabet mode, ignore reputation put notification") + return + } + + // todo: do sanity checks of value and epoch + + args := wrapper.PutArgs{} + args.SetEpoch(epoch) + args.SetPeerID(id) + args.SetValue(value) + + err := rp.reputationWrp.PutViaNotary(args) + if err != nil { + rp.log.Warn("can't send approval tx for reputation value", + zap.String("peer_id", hex.EncodeToString(id.Bytes())), + zap.String("error", err.Error())) + } +} diff --git a/pkg/innerring/processors/reputation/processor.go b/pkg/innerring/processors/reputation/processor.go new file mode 100644 index 000000000..561b221c7 --- /dev/null +++ b/pkg/innerring/processors/reputation/processor.go @@ -0,0 +1,113 @@ +package reputation + +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + reputationWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + reputationEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/reputation" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type ( + // EpochState is a callback interface for inner ring global state. + EpochState interface { + EpochCounter() uint64 + } + + // AlphabetState is a callback interface for inner ring global state. + AlphabetState interface { + IsAlphabet() bool + } + + // Processor of events produced by reputation contract. + Processor struct { + log *zap.Logger + pool *ants.Pool + + reputationContract util.Uint160 + + epochState EpochState + alphabetState AlphabetState + + reputationWrp *reputationWrapper.ClientWrapper + } + + // Params of the processor constructor. + Params struct { + Log *zap.Logger + PoolSize int + ReputationContract util.Uint160 + EpochState EpochState + AlphabetState AlphabetState + ReputationWrapper *reputationWrapper.ClientWrapper + } +) + +const ( + putReputationNotification = "reputationPut" +) + +// New creates reputation contract processor instance. +func New(p *Params) (*Processor, error) { + switch { + case p.Log == nil: + return nil, errors.New("ir/reputation: logger is not set") + case p.EpochState == nil: + return nil, errors.New("ir/reputation: global state is not set") + case p.AlphabetState == nil: + return nil, errors.New("ir/reputation: global state is not set") + case p.ReputationWrapper == nil: + return nil, errors.New("ir/reputation: reputation contract wrapper is not set") + } + + p.Log.Debug("reputation worker pool", zap.Int("size", p.PoolSize)) + + pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true)) + if err != nil { + return nil, errors.Wrap(err, "ir/reputation: can't create worker pool") + } + + return &Processor{ + log: p.Log, + pool: pool, + reputationContract: p.ReputationContract, + epochState: p.EpochState, + alphabetState: p.AlphabetState, + reputationWrp: p.ReputationWrapper, + }, nil +} + +// ListenerParsers for the 'event.Listener' event producer. +func (rp *Processor) ListenerParsers() []event.ParserInfo { + var parsers []event.ParserInfo + + // put reputation event + put := event.ParserInfo{} + put.SetType(putReputationNotification) + put.SetScriptHash(rp.reputationContract) + put.SetParser(reputationEvent.ParsePut) + parsers = append(parsers, put) + + return parsers +} + +// ListenerHandlers for the 'event.Listener' event producer. +func (rp *Processor) ListenerHandlers() []event.HandlerInfo { + var handlers []event.HandlerInfo + + // put reputation handler + put := event.HandlerInfo{} + put.SetType(putReputationNotification) + put.SetScriptHash(rp.reputationContract) + put.SetHandler(rp.handlePutReputation) + handlers = append(handlers, put) + + return handlers +} + +// TimersHandlers for the 'Timers' event producer. +func (rp *Processor) TimersHandlers() []event.HandlerInfo { + return nil +}