From d4bd726c2511b3223a5f0097eb3155a01d912ed8 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 28 Jan 2021 22:51:41 +0300 Subject: [PATCH] [#326] ir: Make netmap processor to generate audit settlement events Pass handler of audit settlement event to netmap event processor. Generate AuditEvent in during new epoch processing. Signed-off-by: Leonard Lyubich --- pkg/innerring/innerring.go | 106 ++++++++---------- .../processors/netmap/process_epoch.go | 2 + pkg/innerring/processors/netmap/processor.go | 8 ++ pkg/innerring/util.go | 9 ++ 4 files changed, 65 insertions(+), 60 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 5b011e22..ef401beb 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -16,13 +16,13 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement" auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/client" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" - netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" util2 "github.com/nspcc-dev/neofs-node/pkg/util" @@ -63,8 +63,6 @@ type ( predefinedValidators []keys.PublicKey workers []func(context.Context) - - auditSettlement *auditSettlement.Calculator } contracts struct { @@ -227,6 +225,21 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container) + if err != nil { + return nil, err + } + + nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap) + if err != nil { + return nil, err + } + + balClient, err := invoke.NewBalanceClient(server.morphClient, server.contracts.balance) + if err != nil { + return nil, err + } + clientCache := newClientCache(&clientCacheParams{ Log: log, Key: server.key, @@ -271,6 +284,34 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + auditCalcDeps := &auditSettlementDeps{ + log: server.log, + cnrSrc: cnrClient, + auditClient: server.auditClient, + nmSrc: nmClient, + clientCache: clientCache, + balanceClient: balClient, + } + + auditSettlementCalc := auditSettlement.NewCalculator( + &auditSettlement.CalculatorPrm{ + ResultStorage: auditCalcDeps, + ContainerStorage: auditCalcDeps, + PlacementCalculator: auditCalcDeps, + SGStorage: auditCalcDeps, + AccountStorage: auditCalcDeps, + Exchanger: auditCalcDeps, + }, + auditSettlement.WithLogger(server.log), + ) + + settlementProcessor := settlement.New( + settlement.Prm{ + AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), + }, + settlement.WithLogger(server.log), + ) + var netmapProcessor *netmap.Processor server.epochTimer = timers.NewBlockTimer( @@ -294,6 +335,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), HandleAudit: auditProcessor.StartAuditHandler(), + + AuditSettlementsHandler: server.onlyActiveEventHandler(settlementProcessor.HandleAuditEvent), }) if err != nil { return nil, err @@ -394,48 +437,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } - cnrClient, err := invoke.NewNoFeeContainerClient(server.morphClient, server.contracts.container) - if err != nil { - return nil, err - } - - nmClient, err := invoke.NewNoFeeNetmapClient(server.morphClient, server.contracts.netmap) - if err != nil { - return nil, err - } - - balClient, err := invoke.NewBalanceClient(server.morphClient, server.contracts.balance) - if err != nil { - return nil, err - } - - auditCalcDeps := &auditSettlementDeps{ - log: server.log, - cnrSrc: cnrClient, - auditClient: server.auditClient, - nmSrc: nmClient, - clientCache: clientCache, - balanceClient: balClient, - } - - server.auditSettlement = auditSettlement.NewCalculator( - &auditSettlement.CalculatorPrm{ - ResultStorage: auditCalcDeps, - ContainerStorage: auditCalcDeps, - PlacementCalculator: auditCalcDeps, - SGStorage: auditCalcDeps, - AccountStorage: auditCalcDeps, - Exchanger: auditCalcDeps, - }, - auditSettlement.WithLogger(server.log), - ) - - server.subscribeNewEpoch(func(e netmapEvent.NewEpoch) { - server.auditSettlement.Calculate(&auditSettlement.CalculatePrm{ - Epoch: e.EpochNumber(), - }) - }) - // todo: create vivid id component return server, nil @@ -618,21 +619,6 @@ func (s *Server) tickTimers() { } } -func (s *Server) subscribeNewEpoch(f func(netmapEvent.NewEpoch)) { - hi := event.HandlerInfo{} - - // TODO: replace and share - const newEpochNotification = "NewEpoch" - - hi.SetType(event.TypeFromString(newEpochNotification)) - hi.SetScriptHash(s.contracts.netmap) - hi.SetHandler(s.onlyActiveEventHandler(func(ev event.Event) { - f(ev.(netmapEvent.NewEpoch)) - })) - - s.morphListener.RegisterHandler(hi) -} - func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { return func(ev event.Event) { if s.IsActive() { diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 8d69fd90..64ac439c 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -3,6 +3,7 @@ package netmap import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement" "go.uber.org/zap" ) @@ -27,6 +28,7 @@ func (np *Processor) processNewEpoch(epoch uint64) { np.netmapSnapshot.update(snapshot, epoch) np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) np.handleNewAudit(audit.NewAuditStartEvent(epoch)) + np.handleAuditSettlements(settlement.NewAuditEvent(epoch)) } // Process new epoch tick by invoking new epoch method in network map contract. diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index 1877257f..e9450400 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -41,6 +41,8 @@ type ( netmapSnapshot cleanupTable handleNewAudit event.Handler + + handleAuditSettlements event.Handler } // Params of the processor constructor. @@ -55,6 +57,8 @@ type ( CleanupEnabled bool CleanupThreshold uint64 // in epochs HandleAudit event.Handler + + AuditSettlementsHandler event.Handler } ) @@ -79,6 +83,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/netmap: global state is not set") case p.HandleAudit == nil: return nil, errors.New("ir/netmap: audit handler is not set") + case p.AuditSettlementsHandler == nil: + return nil, errors.New("ir/netmap: audit settlement handler is not set") } p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) @@ -98,6 +104,8 @@ func New(p *Params) (*Processor, error) { morphClient: p.MorphClient, netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), handleNewAudit: p.HandleAudit, + + handleAuditSettlements: p.AuditSettlementsHandler, }, nil } diff --git a/pkg/innerring/util.go b/pkg/innerring/util.go index 028c0f1b..17ea99c6 100644 --- a/pkg/innerring/util.go +++ b/pkg/innerring/util.go @@ -1,6 +1,7 @@ package innerring import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" ) @@ -9,3 +10,11 @@ type blockTimerWrapper timers.BlockTimer func (t *blockTimerWrapper) ResetEpochTimer() error { return (*timers.BlockTimer)(t).Reset() } + +type auditSettlementCalculator audit.Calculator + +func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { + (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ + Epoch: epoch, + }) +}