forked from TrueCloudLab/frostfs-node
[#812] pkg/innerring: Support notary notifications in reputation processor
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
a7f6a3df78
commit
dc26a09ec3
4 changed files with 65 additions and 24 deletions
|
@ -777,6 +777,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
NetMapSource: server.netmapClient,
|
NetMapSource: server.netmapClient,
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
|
NotaryDisabled: server.sideNotaryConfig.disabled,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -19,13 +19,7 @@ func (rp *Processor) handlePutReputation(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := rp.pool.Submit(func() {
|
err := rp.pool.Submit(func() { rp.processPut(&put) })
|
||||||
rp.processPut(
|
|
||||||
put.Epoch(),
|
|
||||||
peerID,
|
|
||||||
put.Value(),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
rp.log.Warn("reputation worker pool drained",
|
rp.log.Warn("reputation worker pool drained",
|
||||||
|
|
|
@ -8,18 +8,23 @@ import (
|
||||||
|
|
||||||
apireputation "github.com/nspcc-dev/neofs-api-go/pkg/reputation"
|
apireputation "github.com/nspcc-dev/neofs-api-go/pkg/reputation"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper"
|
||||||
|
reputationEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/reputation"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errWrongManager = errors.New("got manager that is incorrect for peer")
|
var errWrongManager = errors.New("got manager that is incorrect for peer")
|
||||||
|
|
||||||
func (rp *Processor) processPut(epoch uint64, id apireputation.PeerID, value apireputation.GlobalTrust) {
|
func (rp *Processor) processPut(e *reputationEvent.Put) {
|
||||||
if !rp.alphabetState.IsAlphabet() {
|
if !rp.alphabetState.IsAlphabet() {
|
||||||
rp.log.Info("non alphabet mode, ignore reputation put notification")
|
rp.log.Info("non alphabet mode, ignore reputation put notification")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
epoch := e.Epoch()
|
||||||
|
id := e.PeerID()
|
||||||
|
value := e.Value()
|
||||||
|
|
||||||
// check if epoch is valid
|
// check if epoch is valid
|
||||||
currentEpoch := rp.epochState.EpochCounter()
|
currentEpoch := rp.epochState.EpochCounter()
|
||||||
if epoch >= currentEpoch {
|
if epoch >= currentEpoch {
|
||||||
|
@ -49,18 +54,7 @@ func (rp *Processor) processPut(epoch uint64, id apireputation.PeerID, value api
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
args := wrapper.PutArgs{}
|
rp.approvePutReputation(e)
|
||||||
args.SetEpoch(epoch)
|
|
||||||
args.SetPeerID(id)
|
|
||||||
args.SetValue(value)
|
|
||||||
|
|
||||||
err := rp.reputationWrp.Put(args)
|
|
||||||
if err != nil {
|
|
||||||
// FIXME: do not use `ToV2` method outside neofs-api-go library
|
|
||||||
rp.log.Warn("can't send approval tx for reputation value",
|
|
||||||
zap.String("peer_id", hex.EncodeToString(id.ToV2().GetPublicKey())),
|
|
||||||
zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apireputation.PeerID) error {
|
func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apireputation.PeerID) error {
|
||||||
|
@ -78,3 +72,28 @@ func (rp *Processor) checkManagers(e uint64, mng apireputation.PeerID, peer apir
|
||||||
|
|
||||||
return errWrongManager
|
return errWrongManager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rp *Processor) approvePutReputation(e *reputationEvent.Put) {
|
||||||
|
var (
|
||||||
|
id = e.PeerID()
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
if nr := e.NotaryRequest(); nr != nil {
|
||||||
|
// put event was received via Notary service
|
||||||
|
err = rp.reputationWrp.Morph().NotarySignAndInvokeTX(nr.MainTransaction)
|
||||||
|
} else {
|
||||||
|
args := wrapper.PutArgs{}
|
||||||
|
args.SetEpoch(e.Epoch())
|
||||||
|
args.SetPeerID(id)
|
||||||
|
args.SetValue(e.Value())
|
||||||
|
|
||||||
|
err = rp.reputationWrp.Put(args)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// FIXME: do not use `ToV2` method outside neofs-api-go library
|
||||||
|
rp.log.Warn("can't send approval tx for reputation value",
|
||||||
|
zap.String("peer_id", hex.EncodeToString(id.ToV2().GetPublicKey())),
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
reputationWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper"
|
reputationWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
|
@ -37,6 +38,8 @@ type (
|
||||||
reputationWrp *reputationWrapper.ClientWrapper
|
reputationWrp *reputationWrapper.ClientWrapper
|
||||||
|
|
||||||
mngBuilder common.ManagerBuilder
|
mngBuilder common.ManagerBuilder
|
||||||
|
|
||||||
|
notaryDisabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
|
@ -48,6 +51,7 @@ type (
|
||||||
AlphabetState AlphabetState
|
AlphabetState AlphabetState
|
||||||
ReputationWrapper *reputationWrapper.ClientWrapper
|
ReputationWrapper *reputationWrapper.ClientWrapper
|
||||||
ManagerBuilder common.ManagerBuilder
|
ManagerBuilder common.ManagerBuilder
|
||||||
|
NotaryDisabled bool
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -85,11 +89,16 @@ func New(p *Params) (*Processor, error) {
|
||||||
alphabetState: p.AlphabetState,
|
alphabetState: p.AlphabetState,
|
||||||
reputationWrp: p.ReputationWrapper,
|
reputationWrp: p.ReputationWrapper,
|
||||||
mngBuilder: p.ManagerBuilder,
|
mngBuilder: p.ManagerBuilder,
|
||||||
|
notaryDisabled: p.NotaryDisabled,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenerNotificationParsers for the 'event.Listener' event producer.
|
// ListenerNotificationParsers for the 'event.Listener' event producer.
|
||||||
func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
|
func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
|
||||||
|
if !rp.notaryDisabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var parsers []event.NotificationParserInfo
|
var parsers []event.NotificationParserInfo
|
||||||
|
|
||||||
// put reputation event
|
// put reputation event
|
||||||
|
@ -104,6 +113,10 @@ func (rp *Processor) ListenerNotificationParsers() []event.NotificationParserInf
|
||||||
|
|
||||||
// ListenerNotificationHandlers for the 'event.Listener' event producer.
|
// ListenerNotificationHandlers for the 'event.Listener' event producer.
|
||||||
func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
|
func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
|
||||||
|
if !rp.notaryDisabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var handlers []event.NotificationHandlerInfo
|
var handlers []event.NotificationHandlerInfo
|
||||||
|
|
||||||
// put reputation handler
|
// put reputation handler
|
||||||
|
@ -116,14 +129,28 @@ func (rp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI
|
||||||
return handlers
|
return handlers
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenerNotaryParsers for the 'event.Listener' event producer.
|
// ListenerNotaryParsers for the 'event.Listener' notary event producer.
|
||||||
func (rp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
|
func (rp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
|
||||||
return nil
|
var p event.NotaryParserInfo
|
||||||
|
|
||||||
|
p.SetMempoolType(mempoolevent.TransactionAdded)
|
||||||
|
p.SetRequestType(reputationEvent.PutNotaryEvent)
|
||||||
|
p.SetScriptHash(rp.reputationContract)
|
||||||
|
p.SetParser(reputationEvent.ParsePutNotary)
|
||||||
|
|
||||||
|
return []event.NotaryParserInfo{p}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListenerNotaryHandlers for the 'event.Listener' event producer.
|
// ListenerNotaryHandlers for the 'event.Listener' notary event producer.
|
||||||
func (rp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
|
func (rp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
|
||||||
return nil
|
var h event.NotaryHandlerInfo
|
||||||
|
|
||||||
|
h.SetMempoolType(mempoolevent.TransactionAdded)
|
||||||
|
h.SetRequestType(reputationEvent.PutNotaryEvent)
|
||||||
|
h.SetScriptHash(rp.reputationContract)
|
||||||
|
h.SetHandler(rp.handlePutReputation)
|
||||||
|
|
||||||
|
return []event.NotaryHandlerInfo{h}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TimersHandlers for the 'Timers' event producer.
|
// TimersHandlers for the 'Timers' event producer.
|
||||||
|
|
Loading…
Reference in a new issue