From 6848a816f99593012e594e71063cec6686d9a949 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 29 Jan 2021 10:42:40 +0300 Subject: [PATCH] [#355] innerring: Refactor block timer constructors This small refactoring adds `blocktimer.go` file with all timer related function and constructors. This way we can create all timers in one place (at the end of innerring.Server constructor). To do that we had to move timer reset into global server state so it can be accessed by netmap processor. Signed-off-by: Alex Vanin --- pkg/innerring/audit.go | 8 ++++ pkg/innerring/blocktimer.go | 61 +++++++++++++++++++++++++++++ pkg/innerring/innerring.go | 78 ++++++++++++++----------------------- pkg/innerring/state.go | 7 ++++ pkg/innerring/util.go | 20 ---------- 5 files changed, 106 insertions(+), 68 deletions(-) create mode 100644 pkg/innerring/blocktimer.go delete mode 100644 pkg/innerring/util.go diff --git a/pkg/innerring/audit.go b/pkg/innerring/audit.go index 9c26cd407..f32b0fe92 100644 --- a/pkg/innerring/audit.go +++ b/pkg/innerring/audit.go @@ -35,6 +35,8 @@ type auditSettlementDeps struct { balanceClient *balanceClient.Wrapper } +type auditSettlementCalculator audit.Calculator + type containerWrapper containerAPI.Container type nodeInfoWrapper struct { @@ -194,3 +196,9 @@ func (a auditSettlementDeps) Transfer(sender, recipient *owner.ID, amount *big.I log.Debug("transfer transaction for audit was successfully sent") } + +func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { + (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ + Epoch: epoch, + }) +} diff --git a/pkg/innerring/blocktimer.go b/pkg/innerring/blocktimer.go new file mode 100644 index 000000000..456ad80d7 --- /dev/null +++ b/pkg/innerring/blocktimer.go @@ -0,0 +1,61 @@ +package innerring + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" + "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" +) + +type ( + epochTimerArgs struct { + nm *netmap.Processor // to handle new epoch tick + + epochDuration uint32 // in blocks + } + + emitTimerArgs struct { + ap *alphabet.Processor // to handle new emission tick + + emitDuration uint32 // in blocks + } +) + +func (s *Server) addBlockTimer(t *timers.BlockTimer) { + s.blockTimers = append(s.blockTimers, t) +} + +func (s *Server) startBlockTimers() error { + for i := range s.blockTimers { + if err := s.blockTimers[i].Reset(); err != nil { + return err + } + } + + return nil +} + +func (s *Server) tickTimers() { + for i := range s.blockTimers { + s.blockTimers[i].Tick() + } +} + +func newEpochTimer(args *epochTimerArgs) *timers.BlockTimer { + epochTimer := timers.NewBlockTimer( + timers.StaticBlockMeter(args.epochDuration), + func() { + args.nm.HandleNewEpochTick(timers.NewEpochTick{}) + }, + ) + + return epochTimer +} + +func newEmissionTimer(args *emitTimerArgs) *timers.BlockTimer { + return timers.NewBlockTimer( + timers.StaticBlockMeter(args.emitDuration), + func() { + args.ap.HandleGasEmission(timers.NewAlphabetEmitTick{}) + }, + ) +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index b0d7cba24..fe9a0fd7d 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -21,7 +21,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/client" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" - balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" @@ -54,7 +53,6 @@ type ( innerRingSize atomic.Int32 precision precision.Fixed8Converter auditClient *auditWrapper.ClientWrapper - balanceClient *balanceWrapper.Wrapper // internal variables key *ecdsa.PrivateKey @@ -251,6 +249,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") porPoolSize := cfg.GetInt("audit.por.pool_size") + // create audit processor dependencies auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), @@ -284,6 +283,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + // create settlement processor dependencies auditCalcDeps := &auditSettlementDeps{ log: server.log, cnrSrc: cnrClient, @@ -305,6 +305,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error auditSettlement.WithLogger(server.log), ) + // create settlement processor settlementProcessor := settlement.New( settlement.Prm{ AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), @@ -312,31 +313,23 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error settlement.WithLogger(server.log), ) - var netmapProcessor *netmap.Processor - - server.epochTimer = timers.NewBlockTimer( - timers.StaticBlockMeter(cfg.GetUint32("timers.epoch")), - func() { - netmapProcessor.HandleNewEpochTick(timers.NewEpochTick{}) - }, - ) - - server.addBlockTimer(server.epochTimer) - // create netmap processor - netmapProcessor, err = netmap.New(&netmap.Params{ + netmapProcessor, err := netmap.New(&netmap.Params{ Log: log, PoolSize: cfg.GetInt("workers.netmap"), NetmapContract: server.contracts.netmap, - EpochTimer: (*blockTimerWrapper)(server.epochTimer), + EpochTimer: server, MorphClient: server.morphClient, EpochState: server, ActiveState: server, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), - HandleAudit: auditProcessor.StartAuditHandler(), - - AuditSettlementsHandler: server.onlyActiveEventHandler(settlementProcessor.HandleAuditEvent), + HandleAudit: server.onlyActiveEventHandler( + auditProcessor.StartAuditHandler(), + ), + AuditSettlementsHandler: server.onlyActiveEventHandler( + settlementProcessor.HandleAuditEvent, + ), }) if err != nil { return nil, err @@ -409,17 +402,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } - var alphabetProcessor *alphabet.Processor - - server.addBlockTimer(timers.NewBlockTimer( - timers.StaticBlockMeter(cfg.GetUint32("timers.emit")), - func() { - alphabetProcessor.HandleGasEmission(timers.NewAlphabetEmitTick{}) - }, - )) - // create alphabet processor - alphabetProcessor, err = alphabet.New(&alphabet.Params{ + alphabetProcessor, err := alphabet.New(&alphabet.Params{ Log: log, PoolSize: cfg.GetInt("workers.alphabet"), AlphabetContracts: server.contracts.alphabet, @@ -439,6 +423,22 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error // todo: create vivid id component + // initialize epoch timers + server.epochTimer = newEpochTimer(&epochTimerArgs{ + nm: netmapProcessor, + epochDuration: cfg.GetUint32("timers.epoch"), + }) + + server.addBlockTimer(server.epochTimer) + + // initialize emission timer + emissionTimer := newEmissionTimer(&emitTimerArgs{ + ap: alphabetProcessor, + emitDuration: cfg.GetUint32("timers.emit"), + }) + + server.addBlockTimer(emissionTimer) + return server, nil } @@ -599,26 +599,8 @@ func (s *Server) initConfigFromBlockchain() error { return nil } -func (s *Server) addBlockTimer(t *timers.BlockTimer) { - s.blockTimers = append(s.blockTimers, t) -} - -func (s *Server) startBlockTimers() error { - for i := range s.blockTimers { - if err := s.blockTimers[i].Reset(); err != nil { - return err - } - } - - return nil -} - -func (s *Server) tickTimers() { - for i := range s.blockTimers { - s.blockTimers[i].Tick() - } -} - +// onlyActiveHandler wrapper around event handler that executes it +// only if inner ring node state is active. func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { return func(ev event.Event) { if s.IsActive() { diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index d55b413a2..ac220c654 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -83,3 +83,10 @@ func (s *Server) WriteReport(r *audit.Report) error { return s.auditClient.PutAuditResult(res) } + +// ResetEpochTimer resets block timer that produces events to update epoch +// counter in netmap contract. Used to synchronize this even production +// based on block with notification of last epoch. +func (s *Server) ResetEpochTimer() error { + return s.epochTimer.Reset() +} diff --git a/pkg/innerring/util.go b/pkg/innerring/util.go deleted file mode 100644 index 17ea99c64..000000000 --- a/pkg/innerring/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package innerring - -import ( - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" - "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" -) - -type blockTimerWrapper timers.BlockTimer - -func (t *blockTimerWrapper) ResetEpochTimer() error { - return (*timers.BlockTimer)(t).Reset() -} - -type auditSettlementCalculator audit.Calculator - -func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { - (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ - Epoch: epoch, - }) -}