forked from TrueCloudLab/frostfs-node
[#324] ir: Measure GAS emission intervals in sidechain blocks
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
19bb94cc04
commit
d01b4e1a2d
6 changed files with 41 additions and 123 deletions
|
@ -75,7 +75,7 @@ func defaultConfiguration(cfg *viper.Viper) {
|
||||||
cfg.SetDefault("contracts.gas", "a6a6c15dcdc9b997dac448b6926522d22efeedfb")
|
cfg.SetDefault("contracts.gas", "a6a6c15dcdc9b997dac448b6926522d22efeedfb")
|
||||||
|
|
||||||
cfg.SetDefault("timers.epoch", "0")
|
cfg.SetDefault("timers.epoch", "0")
|
||||||
cfg.SetDefault("timers.emit", "30s")
|
cfg.SetDefault("timers.emit", "0")
|
||||||
|
|
||||||
cfg.SetDefault("workers.netmap", "10")
|
cfg.SetDefault("workers.netmap", "10")
|
||||||
cfg.SetDefault("workers.balance", "10")
|
cfg.SetDefault("workers.balance", "10")
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package innerring
|
package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,28 +26,16 @@ func connectListenerWithProcessor(l event.Listener, p ContractProcessor) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func connectTimerWithProcessor(t *timers.Timers, p ContractProcessor) error {
|
|
||||||
var err error
|
|
||||||
for _, parser := range p.TimersHandlers() {
|
|
||||||
err = t.RegisterHandler(parser)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// bindMorphProcessor connects both morph chain listener handlers and
|
// bindMorphProcessor connects both morph chain listener handlers and
|
||||||
// local timers handlers.
|
// local timers handlers.
|
||||||
func bindMorphProcessor(proc ContractProcessor, s *Server) error {
|
func bindMorphProcessor(proc ContractProcessor, s *Server) error {
|
||||||
connectListenerWithProcessor(s.morphListener, proc)
|
connectListenerWithProcessor(s.morphListener, proc)
|
||||||
return connectTimerWithProcessor(s.localTimers, proc)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// bindMainnetProcessor connects both mainnet chain listener handlers and
|
// bindMainnetProcessor connects both mainnet chain listener handlers and
|
||||||
// local timers handlers.
|
// local timers handlers.
|
||||||
func bindMainnetProcessor(proc ContractProcessor, s *Server) error {
|
func bindMainnetProcessor(proc ContractProcessor, s *Server) error {
|
||||||
connectListenerWithProcessor(s.mainnetListener, proc)
|
connectListenerWithProcessor(s.mainnetListener, proc)
|
||||||
return connectTimerWithProcessor(s.localTimers, proc)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ type (
|
||||||
// event producers
|
// event producers
|
||||||
morphListener event.Listener
|
morphListener event.Listener
|
||||||
mainnetListener event.Listener
|
mainnetListener event.Listener
|
||||||
localTimers *timers.Timers
|
blockTimers []*timers.BlockTimer
|
||||||
epochTimer *timers.BlockTimer
|
epochTimer *timers.BlockTimer
|
||||||
|
|
||||||
// global state
|
// global state
|
||||||
|
@ -103,8 +103,6 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) error {
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
s.localTimers.Start(ctx) // local timers start ticking
|
|
||||||
|
|
||||||
morphErr := make(chan error)
|
morphErr := make(chan error)
|
||||||
mainnnetErr := make(chan error)
|
mainnnetErr := make(chan error)
|
||||||
|
|
||||||
|
@ -128,11 +126,11 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) error {
|
||||||
zap.Uint32("index", b.Index),
|
zap.Uint32("index", b.Index),
|
||||||
)
|
)
|
||||||
|
|
||||||
s.epochTimer.Tick()
|
s.tickTimers()
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := s.epochTimer.Reset(); err != nil {
|
if err := s.startBlockTimers(); err != nil {
|
||||||
return err
|
return errors.Wrap(err, "could not start block timers")
|
||||||
}
|
}
|
||||||
|
|
||||||
s.startWorkers(ctx)
|
s.startWorkers(ctx)
|
||||||
|
@ -175,12 +173,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
return nil, errors.Wrap(err, "ir: can't parse predefined validators list")
|
return nil, errors.Wrap(err, "ir: can't parse predefined validators list")
|
||||||
}
|
}
|
||||||
|
|
||||||
// create local timer instance
|
|
||||||
server.localTimers = timers.New(&timers.Params{
|
|
||||||
Log: log,
|
|
||||||
AlphabetDuration: cfg.GetDuration("timers.emit"),
|
|
||||||
})
|
|
||||||
|
|
||||||
morphChain := &chainParams{
|
morphChain := &chainParams{
|
||||||
log: log,
|
log: log,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
|
@ -281,6 +273,8 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
server.addBlockTimer(server.epochTimer)
|
||||||
|
|
||||||
// create netmap processor
|
// create netmap processor
|
||||||
netmapProcessor, err = netmap.New(&netmap.Params{
|
netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: log,
|
Log: log,
|
||||||
|
@ -365,8 +359,17 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var alphabetProcessor *alphabet.Processor
|
||||||
|
|
||||||
|
server.addBlockTimer(timers.NewBlockTimer(
|
||||||
|
timers.StaticBlockMeter(cfg.GetUint32("timers.emit")),
|
||||||
|
func() {
|
||||||
|
alphabetProcessor.HandleGasEmission(timers.NewAlphabetEmitTick{})
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
// create alphabet processor
|
// create alphabet processor
|
||||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
||||||
Log: log,
|
Log: log,
|
||||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||||
AlphabetContracts: server.contracts.alphabet,
|
AlphabetContracts: server.contracts.alphabet,
|
||||||
|
@ -545,3 +548,23 @@ func (s *Server) initConfigFromBlockchain() error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) addBlockTimer(t *timers.BlockTimer) {
|
||||||
|
s.blockTimers = append(s.blockTimers, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) startBlockTimers() error {
|
||||||
|
for i := range s.blockTimers {
|
||||||
|
if err := s.blockTimers[i].Reset(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) tickTimers() {
|
||||||
|
for i := range s.blockTimers {
|
||||||
|
s.blockTimers[i].Tick()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (np *Processor) handleGasEmission(ev event.Event) {
|
func (np *Processor) HandleGasEmission(ev event.Event) {
|
||||||
_ = ev.(timers.NewAlphabetEmitTick)
|
_ = ev.(timers.NewAlphabetEmitTick)
|
||||||
np.log.Info("tick", zap.String("type", "alphabet gas emit"))
|
np.log.Info("tick", zap.String("type", "alphabet gas emit"))
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package alphabet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
|
@ -80,13 +79,6 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo {
|
||||||
|
|
||||||
// TimersHandlers for the 'Timers' event producer.
|
// TimersHandlers for the 'Timers' event producer.
|
||||||
func (np *Processor) TimersHandlers() []event.HandlerInfo {
|
func (np *Processor) TimersHandlers() []event.HandlerInfo {
|
||||||
var handlers []event.HandlerInfo
|
|
||||||
|
|
||||||
// new epoch handler
|
return nil
|
||||||
newEpoch := event.HandlerInfo{}
|
|
||||||
newEpoch.SetType(timers.AlphabetTimer)
|
|
||||||
newEpoch.SetHandler(np.handleGasEmission)
|
|
||||||
handlers = append(handlers, newEpoch)
|
|
||||||
|
|
||||||
return handlers
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,84 +0,0 @@
|
||||||
package timers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
localTimer struct {
|
|
||||||
duration time.Duration
|
|
||||||
timer *time.Timer
|
|
||||||
handler event.Handler
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timers is a component for local inner ring timers to produce local events.
|
|
||||||
Timers struct {
|
|
||||||
log *zap.Logger
|
|
||||||
|
|
||||||
alphabet localTimer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Params for timers instance constructor.
|
|
||||||
Params struct {
|
|
||||||
Log *zap.Logger
|
|
||||||
EpochDuration time.Duration
|
|
||||||
AlphabetDuration time.Duration
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// AlphabetTimer is a type for HandlerInfo structure.
|
|
||||||
AlphabetTimer = "AlphabetTimer"
|
|
||||||
)
|
|
||||||
|
|
||||||
// New creates instance of timers component.
|
|
||||||
func New(p *Params) *Timers {
|
|
||||||
return &Timers{
|
|
||||||
log: p.Log,
|
|
||||||
alphabet: localTimer{duration: p.AlphabetDuration},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start runs all available local timers.
|
|
||||||
func (t *Timers) Start(ctx context.Context) {
|
|
||||||
t.alphabet.timer = time.NewTimer(t.alphabet.duration)
|
|
||||||
go t.serve(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Timers) serve(ctx context.Context) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
t.log.Info("timers are getting stopped")
|
|
||||||
t.alphabet.timer.Stop()
|
|
||||||
|
|
||||||
return
|
|
||||||
case <-t.alphabet.timer.C:
|
|
||||||
// reset timer so it can tick once again
|
|
||||||
t.alphabet.timer.Reset(t.alphabet.duration)
|
|
||||||
// call handler, it should be always set
|
|
||||||
t.alphabet.handler(NewAlphabetEmitTick{})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterHandler of local timers events.
|
|
||||||
func (t *Timers) RegisterHandler(h event.HandlerInfo) error {
|
|
||||||
if h.Handler() == nil {
|
|
||||||
return errors.New("ir/timers: can't register nil handler")
|
|
||||||
}
|
|
||||||
|
|
||||||
switch h.GetType() {
|
|
||||||
case AlphabetTimer:
|
|
||||||
t.alphabet.handler = h.Handler()
|
|
||||||
default:
|
|
||||||
return errors.New("ir/timers: unknown handler type")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
Loading…
Reference in a new issue