[#476] reputation: Make reputation report async

Add handler closure over worker pool
in the event package.
Add `addNewEpochAsyncNotificationHandler`
function that uses that closure. Pass
the reputation report handler to worker
pool via using that function.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-04-13 15:53:58 +03:00 committed by Alex Vanin
parent 7cd4e409eb
commit 72699b4c26
3 changed files with 43 additions and 8 deletions

View file

@ -167,10 +167,24 @@ func (c *cfg) localNodeInfoFromNetmap(nm *netmapSDK.Netmap) *netmapSDK.NodeInfo
return nil return nil
} }
// addNewEpochNotificationHandler adds handler that will be executed synchronously
func addNewEpochNotificationHandler(c *cfg, h event.Handler) { func addNewEpochNotificationHandler(c *cfg, h event.Handler) {
addNetmapNotificationHandler(c, newEpochNotification, h) addNetmapNotificationHandler(c, newEpochNotification, h)
} }
// addNewEpochAsyncNotificationHandler adds handler that will be executed asynchronously via netmap workerPool
func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) {
addNetmapNotificationHandler(
c,
newEpochNotification,
event.WorkerPoolHandler(
c.cfgNetmap.workerPool,
h,
c.log,
),
)
}
func goOffline(c *cfg) { func goOffline(c *cfg) {
err := c.cfgNetmap.wrapper.UpdatePeerState( err := c.cfgNetmap.wrapper.UpdatePeerState(
crypto.MarshalPublicKey(&c.key.PublicKey), crypto.MarshalPublicKey(&c.key.PublicKey),

View file

@ -309,15 +309,17 @@ func initReputationService(c *cfg) {
LocalTrustTarget: router, LocalTrustTarget: router,
}) })
addNewEpochNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(
var reportPrm trustcontroller.ReportPrm c,
func(ev event.Event) {
var reportPrm trustcontroller.ReportPrm
// report collected values from previous epoch // report collected values from previous epoch
reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1)
// TODO: implement and use worker pool [neofs-node#440] c.cfgReputation.localTrustCtrl.Report(reportPrm)
go c.cfgReputation.localTrustCtrl.Report(reportPrm) },
}) )
v2reputationgrpc.RegisterReputationServiceServer(c.cfgGRPC.server, v2reputationgrpc.RegisterReputationServiceServer(c.cfgGRPC.server,
grpcreputation.New( grpcreputation.New(

View file

@ -1,6 +1,10 @@
package event package event
import "github.com/nspcc-dev/neo-go/pkg/util" import (
"github.com/nspcc-dev/neo-go/pkg/util"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
"go.uber.org/zap"
)
type scriptHashValue struct { type scriptHashValue struct {
hash util.Uint160 hash util.Uint160
@ -34,3 +38,18 @@ func (s *typeValue) SetType(v Type) {
func (s typeValue) GetType() Type { func (s typeValue) GetType() Type {
return s.typ return s.typ
} }
// WorkerPoolHandler sets closure over worker pool w with passed handler h.
func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *zap.Logger) Handler {
return func(e Event) {
err := w.Submit(func() {
h(e)
})
if err != nil {
log.Warn("could not Submit handler to worker pool",
zap.String("error", err.Error()),
)
}
}
}