From 72699b4c26e0ebbee5dcbf4ff05896497fa82c17 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 13 Apr 2021 15:53:58 +0300 Subject: [PATCH] [#476] reputation: Make reputation report async Add handler closure over worker pool in the event package. Add `addNewEpochAsyncNotificationHandler` function that uses that closure. Pass the reputation report handler to worker pool via using that function. Signed-off-by: Pavel Karpy --- cmd/neofs-node/netmap.go | 14 ++++++++++++++ cmd/neofs-node/reputation.go | 16 +++++++++------- pkg/morph/event/utils.go | 21 ++++++++++++++++++++- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index 4caf940fd..c1ff4713f 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -167,10 +167,24 @@ func (c *cfg) localNodeInfoFromNetmap(nm *netmapSDK.Netmap) *netmapSDK.NodeInfo return nil } +// addNewEpochNotificationHandler adds handler that will be executed synchronously func addNewEpochNotificationHandler(c *cfg, h event.Handler) { addNetmapNotificationHandler(c, newEpochNotification, h) } +// addNewEpochAsyncNotificationHandler adds handler that will be executed asynchronously via netmap workerPool +func addNewEpochAsyncNotificationHandler(c *cfg, h event.Handler) { + addNetmapNotificationHandler( + c, + newEpochNotification, + event.WorkerPoolHandler( + c.cfgNetmap.workerPool, + h, + c.log, + ), + ) +} + func goOffline(c *cfg) { err := c.cfgNetmap.wrapper.UpdatePeerState( crypto.MarshalPublicKey(&c.key.PublicKey), diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 714df7c66..24f1b91f6 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -309,15 +309,17 @@ func initReputationService(c *cfg) { LocalTrustTarget: router, }) - addNewEpochNotificationHandler(c, func(ev event.Event) { - var reportPrm trustcontroller.ReportPrm + addNewEpochAsyncNotificationHandler( + c, + func(ev event.Event) { + var reportPrm trustcontroller.ReportPrm - // report collected values from previous epoch - reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) + // report collected values from previous epoch + reportPrm.SetEpoch(ev.(netmap.NewEpoch).EpochNumber() - 1) - // TODO: implement and use worker pool [neofs-node#440] - go c.cfgReputation.localTrustCtrl.Report(reportPrm) - }) + c.cfgReputation.localTrustCtrl.Report(reportPrm) + }, + ) v2reputationgrpc.RegisterReputationServiceServer(c.cfgGRPC.server, grpcreputation.New( diff --git a/pkg/morph/event/utils.go b/pkg/morph/event/utils.go index 77adcee50..db7aad262 100644 --- a/pkg/morph/event/utils.go +++ b/pkg/morph/event/utils.go @@ -1,6 +1,10 @@ package event -import "github.com/nspcc-dev/neo-go/pkg/util" +import ( + "github.com/nspcc-dev/neo-go/pkg/util" + util2 "github.com/nspcc-dev/neofs-node/pkg/util" + "go.uber.org/zap" +) type scriptHashValue struct { hash util.Uint160 @@ -34,3 +38,18 @@ func (s *typeValue) SetType(v Type) { func (s typeValue) GetType() Type { return s.typ } + +// WorkerPoolHandler sets closure over worker pool w with passed handler h. +func WorkerPoolHandler(w util2.WorkerPool, h Handler, log *zap.Logger) Handler { + return func(e Event) { + err := w.Submit(func() { + h(e) + }) + + if err != nil { + log.Warn("could not Submit handler to worker pool", + zap.String("error", err.Error()), + ) + } + } +}