diff --git a/pkg/innerring/audit.go b/pkg/innerring/audit.go index 9c26cd40..f32b0fe9 100644 --- a/pkg/innerring/audit.go +++ b/pkg/innerring/audit.go @@ -35,6 +35,8 @@ type auditSettlementDeps struct { balanceClient *balanceClient.Wrapper } +type auditSettlementCalculator audit.Calculator + type containerWrapper containerAPI.Container type nodeInfoWrapper struct { @@ -194,3 +196,9 @@ func (a auditSettlementDeps) Transfer(sender, recipient *owner.ID, amount *big.I log.Debug("transfer transaction for audit was successfully sent") } + +func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { + (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ + Epoch: epoch, + }) +} diff --git a/pkg/innerring/blocktimer.go b/pkg/innerring/blocktimer.go new file mode 100644 index 00000000..456ad80d --- /dev/null +++ b/pkg/innerring/blocktimer.go @@ -0,0 +1,61 @@ +package innerring + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap" + "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" +) + +type ( + epochTimerArgs struct { + nm *netmap.Processor // to handle new epoch tick + + epochDuration uint32 // in blocks + } + + emitTimerArgs struct { + ap *alphabet.Processor // to handle new emission tick + + emitDuration uint32 // in blocks + } +) + +func (s *Server) addBlockTimer(t *timers.BlockTimer) { + s.blockTimers = append(s.blockTimers, t) +} + +func (s *Server) startBlockTimers() error { + for i := range s.blockTimers { + if err := s.blockTimers[i].Reset(); err != nil { + return err + } + } + + return nil +} + +func (s *Server) tickTimers() { + for i := range s.blockTimers { + s.blockTimers[i].Tick() + } +} + +func newEpochTimer(args *epochTimerArgs) *timers.BlockTimer { + epochTimer := timers.NewBlockTimer( + timers.StaticBlockMeter(args.epochDuration), + func() { + args.nm.HandleNewEpochTick(timers.NewEpochTick{}) + }, + ) + + return epochTimer +} + +func newEmissionTimer(args *emitTimerArgs) *timers.BlockTimer { + return timers.NewBlockTimer( + timers.StaticBlockMeter(args.emitDuration), + func() { + args.ap.HandleGasEmission(timers.NewAlphabetEmitTick{}) + }, + ) +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index b0d7cba2..fe9a0fd7 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -21,7 +21,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/client" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" - balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" @@ -54,7 +53,6 @@ type ( innerRingSize atomic.Int32 precision precision.Fixed8Converter auditClient *auditWrapper.ClientWrapper - balanceClient *balanceWrapper.Wrapper // internal variables key *ecdsa.PrivateKey @@ -251,6 +249,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") porPoolSize := cfg.GetInt("audit.por.pool_size") + // create audit processor dependencies auditTaskManager := audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), @@ -284,6 +283,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + // create settlement processor dependencies auditCalcDeps := &auditSettlementDeps{ log: server.log, cnrSrc: cnrClient, @@ -305,6 +305,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error auditSettlement.WithLogger(server.log), ) + // create settlement processor settlementProcessor := settlement.New( settlement.Prm{ AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), @@ -312,31 +313,23 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error settlement.WithLogger(server.log), ) - var netmapProcessor *netmap.Processor - - server.epochTimer = timers.NewBlockTimer( - timers.StaticBlockMeter(cfg.GetUint32("timers.epoch")), - func() { - netmapProcessor.HandleNewEpochTick(timers.NewEpochTick{}) - }, - ) - - server.addBlockTimer(server.epochTimer) - // create netmap processor - netmapProcessor, err = netmap.New(&netmap.Params{ + netmapProcessor, err := netmap.New(&netmap.Params{ Log: log, PoolSize: cfg.GetInt("workers.netmap"), NetmapContract: server.contracts.netmap, - EpochTimer: (*blockTimerWrapper)(server.epochTimer), + EpochTimer: server, MorphClient: server.morphClient, EpochState: server, ActiveState: server, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), - HandleAudit: auditProcessor.StartAuditHandler(), - - AuditSettlementsHandler: server.onlyActiveEventHandler(settlementProcessor.HandleAuditEvent), + HandleAudit: server.onlyActiveEventHandler( + auditProcessor.StartAuditHandler(), + ), + AuditSettlementsHandler: server.onlyActiveEventHandler( + settlementProcessor.HandleAuditEvent, + ), }) if err != nil { return nil, err @@ -409,17 +402,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } - var alphabetProcessor *alphabet.Processor - - server.addBlockTimer(timers.NewBlockTimer( - timers.StaticBlockMeter(cfg.GetUint32("timers.emit")), - func() { - alphabetProcessor.HandleGasEmission(timers.NewAlphabetEmitTick{}) - }, - )) - // create alphabet processor - alphabetProcessor, err = alphabet.New(&alphabet.Params{ + alphabetProcessor, err := alphabet.New(&alphabet.Params{ Log: log, PoolSize: cfg.GetInt("workers.alphabet"), AlphabetContracts: server.contracts.alphabet, @@ -439,6 +423,22 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error // todo: create vivid id component + // initialize epoch timers + server.epochTimer = newEpochTimer(&epochTimerArgs{ + nm: netmapProcessor, + epochDuration: cfg.GetUint32("timers.epoch"), + }) + + server.addBlockTimer(server.epochTimer) + + // initialize emission timer + emissionTimer := newEmissionTimer(&emitTimerArgs{ + ap: alphabetProcessor, + emitDuration: cfg.GetUint32("timers.emit"), + }) + + server.addBlockTimer(emissionTimer) + return server, nil } @@ -599,26 +599,8 @@ func (s *Server) initConfigFromBlockchain() error { return nil } -func (s *Server) addBlockTimer(t *timers.BlockTimer) { - s.blockTimers = append(s.blockTimers, t) -} - -func (s *Server) startBlockTimers() error { - for i := range s.blockTimers { - if err := s.blockTimers[i].Reset(); err != nil { - return err - } - } - - return nil -} - -func (s *Server) tickTimers() { - for i := range s.blockTimers { - s.blockTimers[i].Tick() - } -} - +// onlyActiveHandler wrapper around event handler that executes it +// only if inner ring node state is active. func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { return func(ev event.Event) { if s.IsActive() { diff --git a/pkg/innerring/state.go b/pkg/innerring/state.go index d55b413a..ac220c65 100644 --- a/pkg/innerring/state.go +++ b/pkg/innerring/state.go @@ -83,3 +83,10 @@ func (s *Server) WriteReport(r *audit.Report) error { return s.auditClient.PutAuditResult(res) } + +// ResetEpochTimer resets block timer that produces events to update epoch +// counter in netmap contract. Used to synchronize this even production +// based on block with notification of last epoch. +func (s *Server) ResetEpochTimer() error { + return s.epochTimer.Reset() +} diff --git a/pkg/innerring/util.go b/pkg/innerring/util.go deleted file mode 100644 index 17ea99c6..00000000 --- a/pkg/innerring/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package innerring - -import ( - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit" - "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" -) - -type blockTimerWrapper timers.BlockTimer - -func (t *blockTimerWrapper) ResetEpochTimer() error { - return (*timers.BlockTimer)(t).Reset() -} - -type auditSettlementCalculator audit.Calculator - -func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) { - (*audit.Calculator)(s).Calculate(&audit.CalculatePrm{ - Epoch: epoch, - }) -}