[#355] innerring: Refactor block timer constructors

This small refactoring adds `blocktimer.go` file with
all timer related function and constructors. This way
we can create all timers in one place (at the end of
innerring.Server constructor).

To do that we had to move timer reset into global
server state so it can be accessed by netmap
processor.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-01-29 10:42:40 +03:00 committed by Alex Vanin
parent 402192c8c4
commit 6848a816f9
5 changed files with 106 additions and 68 deletions

View file

@ -35,6 +35,8 @@ type auditSettlementDeps struct {
balanceClient *balanceClient.Wrapper balanceClient *balanceClient.Wrapper
} }
type auditSettlementCalculator audit.Calculator
type containerWrapper containerAPI.Container type containerWrapper containerAPI.Container
type nodeInfoWrapper struct { type nodeInfoWrapper struct {
@ -194,3 +196,9 @@ func (a auditSettlementDeps) Transfer(sender, recipient *owner.ID, amount *big.I
log.Debug("transfer transaction for audit was successfully sent") log.Debug("transfer transaction for audit was successfully sent")
} }
func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) {
(*audit.Calculator)(s).Calculate(&audit.CalculatePrm{
Epoch: epoch,
})
}

View file

@ -0,0 +1,61 @@
package innerring
import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
)
type (
epochTimerArgs struct {
nm *netmap.Processor // to handle new epoch tick
epochDuration uint32 // in blocks
}
emitTimerArgs struct {
ap *alphabet.Processor // to handle new emission tick
emitDuration uint32 // in blocks
}
)
func (s *Server) addBlockTimer(t *timers.BlockTimer) {
s.blockTimers = append(s.blockTimers, t)
}
func (s *Server) startBlockTimers() error {
for i := range s.blockTimers {
if err := s.blockTimers[i].Reset(); err != nil {
return err
}
}
return nil
}
func (s *Server) tickTimers() {
for i := range s.blockTimers {
s.blockTimers[i].Tick()
}
}
func newEpochTimer(args *epochTimerArgs) *timers.BlockTimer {
epochTimer := timers.NewBlockTimer(
timers.StaticBlockMeter(args.epochDuration),
func() {
args.nm.HandleNewEpochTick(timers.NewEpochTick{})
},
)
return epochTimer
}
func newEmissionTimer(args *emitTimerArgs) *timers.BlockTimer {
return timers.NewBlockTimer(
timers.StaticBlockMeter(args.emitDuration),
func() {
args.ap.HandleGasEmission(timers.NewAlphabetEmitTick{})
},
)
}

View file

@ -21,7 +21,6 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client"
auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper" auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
balanceWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber" "github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager" audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager"
@ -54,7 +53,6 @@ type (
innerRingSize atomic.Int32 innerRingSize atomic.Int32
precision precision.Fixed8Converter precision precision.Fixed8Converter
auditClient *auditWrapper.ClientWrapper auditClient *auditWrapper.ClientWrapper
balanceClient *balanceWrapper.Wrapper
// internal variables // internal variables
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
@ -251,6 +249,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pool_size") porPoolSize := cfg.GetInt("audit.por.pool_size")
// create audit processor dependencies
auditTaskManager := audittask.New( auditTaskManager := audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool), audittask.WithWorkerPool(auditPool),
@ -284,6 +283,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err return nil, err
} }
// create settlement processor dependencies
auditCalcDeps := &auditSettlementDeps{ auditCalcDeps := &auditSettlementDeps{
log: server.log, log: server.log,
cnrSrc: cnrClient, cnrSrc: cnrClient,
@ -305,6 +305,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
auditSettlement.WithLogger(server.log), auditSettlement.WithLogger(server.log),
) )
// create settlement processor
settlementProcessor := settlement.New( settlementProcessor := settlement.New(
settlement.Prm{ settlement.Prm{
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
@ -312,31 +313,23 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
settlement.WithLogger(server.log), settlement.WithLogger(server.log),
) )
var netmapProcessor *netmap.Processor
server.epochTimer = timers.NewBlockTimer(
timers.StaticBlockMeter(cfg.GetUint32("timers.epoch")),
func() {
netmapProcessor.HandleNewEpochTick(timers.NewEpochTick{})
},
)
server.addBlockTimer(server.epochTimer)
// create netmap processor // create netmap processor
netmapProcessor, err = netmap.New(&netmap.Params{ netmapProcessor, err := netmap.New(&netmap.Params{
Log: log, Log: log,
PoolSize: cfg.GetInt("workers.netmap"), PoolSize: cfg.GetInt("workers.netmap"),
NetmapContract: server.contracts.netmap, NetmapContract: server.contracts.netmap,
EpochTimer: (*blockTimerWrapper)(server.epochTimer), EpochTimer: server,
MorphClient: server.morphClient, MorphClient: server.morphClient,
EpochState: server, EpochState: server,
ActiveState: server, ActiveState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
HandleAudit: auditProcessor.StartAuditHandler(), HandleAudit: server.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(),
AuditSettlementsHandler: server.onlyActiveEventHandler(settlementProcessor.HandleAuditEvent), ),
AuditSettlementsHandler: server.onlyActiveEventHandler(
settlementProcessor.HandleAuditEvent,
),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -409,17 +402,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
return nil, err return nil, err
} }
var alphabetProcessor *alphabet.Processor
server.addBlockTimer(timers.NewBlockTimer(
timers.StaticBlockMeter(cfg.GetUint32("timers.emit")),
func() {
alphabetProcessor.HandleGasEmission(timers.NewAlphabetEmitTick{})
},
))
// create alphabet processor // create alphabet processor
alphabetProcessor, err = alphabet.New(&alphabet.Params{ alphabetProcessor, err := alphabet.New(&alphabet.Params{
Log: log, Log: log,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: server.contracts.alphabet, AlphabetContracts: server.contracts.alphabet,
@ -439,6 +423,22 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
// todo: create vivid id component // todo: create vivid id component
// initialize epoch timers
server.epochTimer = newEpochTimer(&epochTimerArgs{
nm: netmapProcessor,
epochDuration: cfg.GetUint32("timers.epoch"),
})
server.addBlockTimer(server.epochTimer)
// initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"),
})
server.addBlockTimer(emissionTimer)
return server, nil return server, nil
} }
@ -599,26 +599,8 @@ func (s *Server) initConfigFromBlockchain() error {
return nil return nil
} }
func (s *Server) addBlockTimer(t *timers.BlockTimer) { // onlyActiveHandler wrapper around event handler that executes it
s.blockTimers = append(s.blockTimers, t) // only if inner ring node state is active.
}
func (s *Server) startBlockTimers() error {
for i := range s.blockTimers {
if err := s.blockTimers[i].Reset(); err != nil {
return err
}
}
return nil
}
func (s *Server) tickTimers() {
for i := range s.blockTimers {
s.blockTimers[i].Tick()
}
}
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler { func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) { return func(ev event.Event) {
if s.IsActive() { if s.IsActive() {

View file

@ -83,3 +83,10 @@ func (s *Server) WriteReport(r *audit.Report) error {
return s.auditClient.PutAuditResult(res) return s.auditClient.PutAuditResult(res)
} }
// ResetEpochTimer resets block timer that produces events to update epoch
// counter in netmap contract. Used to synchronize this even production
// based on block with notification of last epoch.
func (s *Server) ResetEpochTimer() error {
return s.epochTimer.Reset()
}

View file

@ -1,20 +0,0 @@
package innerring
import (
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
)
type blockTimerWrapper timers.BlockTimer
func (t *blockTimerWrapper) ResetEpochTimer() error {
return (*timers.BlockTimer)(t).Reset()
}
type auditSettlementCalculator audit.Calculator
func (s *auditSettlementCalculator) ProcessAuditSettlements(epoch uint64) {
(*audit.Calculator)(s).Calculate(&audit.CalculatePrm{
Epoch: epoch,
})
}