From 19bb94cc049912cf3dc9150901fc5c33e8e8c632 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 22 Jan 2021 18:01:44 +0300 Subject: [PATCH] [#324] ir: Measure epochs in sidechain blocks Signed-off-by: Leonard Lyubich --- cmd/neofs-ir/defaults.go | 2 +- pkg/innerring/innerring.go | 28 +++++++++++++++++-- pkg/innerring/processors/netmap/handlers.go | 2 +- .../processors/netmap/process_epoch.go | 7 ++++- pkg/innerring/processors/netmap/processor.go | 13 ++------- pkg/innerring/timers/epoch.go | 5 ---- pkg/innerring/timers/timers.go | 13 --------- pkg/innerring/util.go | 11 ++++++++ 8 files changed, 46 insertions(+), 35 deletions(-) create mode 100644 pkg/innerring/util.go diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index a1b1b755..8e681e41 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -74,7 +74,7 @@ func defaultConfiguration(cfg *viper.Viper) { // gas native contract in LE cfg.SetDefault("contracts.gas", "a6a6c15dcdc9b997dac448b6926522d22efeedfb") - cfg.SetDefault("timers.epoch", "5s") + cfg.SetDefault("timers.epoch", "0") cfg.SetDefault("timers.emit", "30s") cfg.SetDefault("workers.netmap", "10") diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 16bd1285..cabb82a3 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/util" @@ -40,6 +41,7 @@ type ( morphListener event.Listener mainnetListener event.Listener localTimers *timers.Timers + epochTimer *timers.BlockTimer // global state morphClient *client.Client @@ -121,6 +123,18 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) error { go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events + s.morphListener.RegisterBlockHandler(func(b *block.Block) { + s.log.Info("new block", + zap.Uint32("index", b.Index), + ) + + s.epochTimer.Tick() + }) + + if err := s.epochTimer.Reset(); err != nil { + return err + } + s.startWorkers(ctx) return nil @@ -164,7 +178,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error // create local timer instance server.localTimers = timers.New(&timers.Params{ Log: log, - EpochDuration: cfg.GetDuration("timers.epoch"), AlphabetDuration: cfg.GetDuration("timers.emit"), }) @@ -259,12 +272,21 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error return nil, err } + var netmapProcessor *netmap.Processor + + server.epochTimer = timers.NewBlockTimer( + timers.StaticBlockMeter(cfg.GetUint32("timers.epoch")), + func() { + netmapProcessor.HandleNewEpochTick(timers.NewEpochTick{}) + }, + ) + // create netmap processor - netmapProcessor, err := netmap.New(&netmap.Params{ + netmapProcessor, err = netmap.New(&netmap.Params{ Log: log, PoolSize: cfg.GetInt("workers.netmap"), NetmapContract: server.contracts.netmap, - EpochTimer: server.localTimers, + EpochTimer: (*blockTimerWrapper)(server.epochTimer), MorphClient: server.morphClient, EpochState: server, ActiveState: server, diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index 2e11891d..23046a8b 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -9,7 +9,7 @@ import ( "go.uber.org/zap" ) -func (np *Processor) handleNewEpochTick(ev event.Event) { +func (np *Processor) HandleNewEpochTick(ev event.Event) { _ = ev.(timerEvent.NewEpochTick) np.log.Info("tick", zap.String("type", "epoch")) diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 069b0acf..d9694236 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -10,7 +10,12 @@ import ( // local epoch timer. func (np *Processor) processNewEpoch(epoch uint64) { np.epochState.SetEpochCounter(epoch) - np.epochTimer.ResetEpochTimer() + if err := np.epochTimer.ResetEpochTimer(); err != nil { + np.log.Warn("can't reset epoch timer", + zap.String("error", err.Error())) + + return + } // get new netmap snapshot snapshot, err := invoke.NetmapSnapshot(np.morphClient, np.netmapContract) diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index 7a71c871..1877257f 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -2,7 +2,6 @@ package netmap import ( "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" @@ -14,7 +13,7 @@ import ( type ( // EpochTimerReseter is a callback interface for tickers component. EpochTimerReseter interface { - ResetEpochTimer() + ResetEpochTimer() error } // EpochState is a callback interface for inner ring global state. @@ -160,13 +159,5 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo { // TimersHandlers for the 'Timers' event producer. func (np *Processor) TimersHandlers() []event.HandlerInfo { - var handlers []event.HandlerInfo - - // new epoch handler - newEpoch := event.HandlerInfo{} - newEpoch.SetType(timers.EpochTimer) - newEpoch.SetHandler(np.handleNewEpochTick) - handlers = append(handlers, newEpoch) - - return handlers + return nil } diff --git a/pkg/innerring/timers/epoch.go b/pkg/innerring/timers/epoch.go index a7da6bbb..5ba2e404 100644 --- a/pkg/innerring/timers/epoch.go +++ b/pkg/innerring/timers/epoch.go @@ -5,8 +5,3 @@ type NewEpochTick struct{} // MorphEvent implements Event interface. func (NewEpochTick) MorphEvent() {} - -// ResetEpochTimer to start it again when event has been processed. -func (t *Timers) ResetEpochTimer() { - t.epoch.timer.Reset(t.epoch.duration) -} diff --git a/pkg/innerring/timers/timers.go b/pkg/innerring/timers/timers.go index 22eff892..dd792313 100644 --- a/pkg/innerring/timers/timers.go +++ b/pkg/innerring/timers/timers.go @@ -20,7 +20,6 @@ type ( Timers struct { log *zap.Logger - epoch localTimer alphabet localTimer } @@ -33,8 +32,6 @@ type ( ) const ( - // EpochTimer is a type for HandlerInfo structure. - EpochTimer = "EpochTimer" // AlphabetTimer is a type for HandlerInfo structure. AlphabetTimer = "AlphabetTimer" ) @@ -43,14 +40,12 @@ const ( func New(p *Params) *Timers { return &Timers{ log: p.Log, - epoch: localTimer{duration: p.EpochDuration}, alphabet: localTimer{duration: p.AlphabetDuration}, } } // Start runs all available local timers. func (t *Timers) Start(ctx context.Context) { - t.epoch.timer = time.NewTimer(t.epoch.duration) t.alphabet.timer = time.NewTimer(t.alphabet.duration) go t.serve(ctx) } @@ -60,15 +55,9 @@ func (t *Timers) serve(ctx context.Context) { select { case <-ctx.Done(): t.log.Info("timers are getting stopped") - t.epoch.timer.Stop() t.alphabet.timer.Stop() return - case <-t.epoch.timer.C: - // reset timer so it can tick once again - t.epoch.timer.Reset(t.epoch.duration) - // call handler, it should be always set - t.epoch.handler(NewEpochTick{}) case <-t.alphabet.timer.C: // reset timer so it can tick once again t.alphabet.timer.Reset(t.alphabet.duration) @@ -85,8 +74,6 @@ func (t *Timers) RegisterHandler(h event.HandlerInfo) error { } switch h.GetType() { - case EpochTimer: - t.epoch.handler = h.Handler() case AlphabetTimer: t.alphabet.handler = h.Handler() default: diff --git a/pkg/innerring/util.go b/pkg/innerring/util.go new file mode 100644 index 00000000..028c0f1b --- /dev/null +++ b/pkg/innerring/util.go @@ -0,0 +1,11 @@ +package innerring + +import ( + "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" +) + +type blockTimerWrapper timers.BlockTimer + +func (t *blockTimerWrapper) ResetEpochTimer() error { + return (*timers.BlockTimer)(t).Reset() +}