forked from TrueCloudLab/frostfs-node
[#324] ir: Measure epochs in sidechain blocks
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
b5dc28f79c
commit
19bb94cc04
8 changed files with 46 additions and 35 deletions
|
@ -74,7 +74,7 @@ func defaultConfiguration(cfg *viper.Viper) {
|
||||||
// gas native contract in LE
|
// gas native contract in LE
|
||||||
cfg.SetDefault("contracts.gas", "a6a6c15dcdc9b997dac448b6926522d22efeedfb")
|
cfg.SetDefault("contracts.gas", "a6a6c15dcdc9b997dac448b6926522d22efeedfb")
|
||||||
|
|
||||||
cfg.SetDefault("timers.epoch", "5s")
|
cfg.SetDefault("timers.epoch", "0")
|
||||||
cfg.SetDefault("timers.emit", "30s")
|
cfg.SetDefault("timers.emit", "30s")
|
||||||
|
|
||||||
cfg.SetDefault("workers.netmap", "10")
|
cfg.SetDefault("workers.netmap", "10")
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
@ -40,6 +41,7 @@ type (
|
||||||
morphListener event.Listener
|
morphListener event.Listener
|
||||||
mainnetListener event.Listener
|
mainnetListener event.Listener
|
||||||
localTimers *timers.Timers
|
localTimers *timers.Timers
|
||||||
|
epochTimer *timers.BlockTimer
|
||||||
|
|
||||||
// global state
|
// global state
|
||||||
morphClient *client.Client
|
morphClient *client.Client
|
||||||
|
@ -121,6 +123,18 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) error {
|
||||||
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
|
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
|
||||||
go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events
|
go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events
|
||||||
|
|
||||||
|
s.morphListener.RegisterBlockHandler(func(b *block.Block) {
|
||||||
|
s.log.Info("new block",
|
||||||
|
zap.Uint32("index", b.Index),
|
||||||
|
)
|
||||||
|
|
||||||
|
s.epochTimer.Tick()
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := s.epochTimer.Reset(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
s.startWorkers(ctx)
|
s.startWorkers(ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -164,7 +178,6 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
// create local timer instance
|
// create local timer instance
|
||||||
server.localTimers = timers.New(&timers.Params{
|
server.localTimers = timers.New(&timers.Params{
|
||||||
Log: log,
|
Log: log,
|
||||||
EpochDuration: cfg.GetDuration("timers.epoch"),
|
|
||||||
AlphabetDuration: cfg.GetDuration("timers.emit"),
|
AlphabetDuration: cfg.GetDuration("timers.emit"),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -259,12 +272,21 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var netmapProcessor *netmap.Processor
|
||||||
|
|
||||||
|
server.epochTimer = timers.NewBlockTimer(
|
||||||
|
timers.StaticBlockMeter(cfg.GetUint32("timers.epoch")),
|
||||||
|
func() {
|
||||||
|
netmapProcessor.HandleNewEpochTick(timers.NewEpochTick{})
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
// create netmap processor
|
// create netmap processor
|
||||||
netmapProcessor, err := netmap.New(&netmap.Params{
|
netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: log,
|
Log: log,
|
||||||
PoolSize: cfg.GetInt("workers.netmap"),
|
PoolSize: cfg.GetInt("workers.netmap"),
|
||||||
NetmapContract: server.contracts.netmap,
|
NetmapContract: server.contracts.netmap,
|
||||||
EpochTimer: server.localTimers,
|
EpochTimer: (*blockTimerWrapper)(server.epochTimer),
|
||||||
MorphClient: server.morphClient,
|
MorphClient: server.morphClient,
|
||||||
EpochState: server,
|
EpochState: server,
|
||||||
ActiveState: server,
|
ActiveState: server,
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (np *Processor) handleNewEpochTick(ev event.Event) {
|
func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
||||||
_ = ev.(timerEvent.NewEpochTick)
|
_ = ev.(timerEvent.NewEpochTick)
|
||||||
np.log.Info("tick", zap.String("type", "epoch"))
|
np.log.Info("tick", zap.String("type", "epoch"))
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,12 @@ import (
|
||||||
// local epoch timer.
|
// local epoch timer.
|
||||||
func (np *Processor) processNewEpoch(epoch uint64) {
|
func (np *Processor) processNewEpoch(epoch uint64) {
|
||||||
np.epochState.SetEpochCounter(epoch)
|
np.epochState.SetEpochCounter(epoch)
|
||||||
np.epochTimer.ResetEpochTimer()
|
if err := np.epochTimer.ResetEpochTimer(); err != nil {
|
||||||
|
np.log.Warn("can't reset epoch timer",
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// get new netmap snapshot
|
// get new netmap snapshot
|
||||||
snapshot, err := invoke.NetmapSnapshot(np.morphClient, np.netmapContract)
|
snapshot, err := invoke.NetmapSnapshot(np.morphClient, np.netmapContract)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package netmap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
||||||
|
@ -14,7 +13,7 @@ import (
|
||||||
type (
|
type (
|
||||||
// EpochTimerReseter is a callback interface for tickers component.
|
// EpochTimerReseter is a callback interface for tickers component.
|
||||||
EpochTimerReseter interface {
|
EpochTimerReseter interface {
|
||||||
ResetEpochTimer()
|
ResetEpochTimer() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// EpochState is a callback interface for inner ring global state.
|
// EpochState is a callback interface for inner ring global state.
|
||||||
|
@ -160,13 +159,5 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo {
|
||||||
|
|
||||||
// TimersHandlers for the 'Timers' event producer.
|
// TimersHandlers for the 'Timers' event producer.
|
||||||
func (np *Processor) TimersHandlers() []event.HandlerInfo {
|
func (np *Processor) TimersHandlers() []event.HandlerInfo {
|
||||||
var handlers []event.HandlerInfo
|
return nil
|
||||||
|
|
||||||
// new epoch handler
|
|
||||||
newEpoch := event.HandlerInfo{}
|
|
||||||
newEpoch.SetType(timers.EpochTimer)
|
|
||||||
newEpoch.SetHandler(np.handleNewEpochTick)
|
|
||||||
handlers = append(handlers, newEpoch)
|
|
||||||
|
|
||||||
return handlers
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,8 +5,3 @@ type NewEpochTick struct{}
|
||||||
|
|
||||||
// MorphEvent implements Event interface.
|
// MorphEvent implements Event interface.
|
||||||
func (NewEpochTick) MorphEvent() {}
|
func (NewEpochTick) MorphEvent() {}
|
||||||
|
|
||||||
// ResetEpochTimer to start it again when event has been processed.
|
|
||||||
func (t *Timers) ResetEpochTimer() {
|
|
||||||
t.epoch.timer.Reset(t.epoch.duration)
|
|
||||||
}
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ type (
|
||||||
Timers struct {
|
Timers struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
|
|
||||||
epoch localTimer
|
|
||||||
alphabet localTimer
|
alphabet localTimer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,8 +32,6 @@ type (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EpochTimer is a type for HandlerInfo structure.
|
|
||||||
EpochTimer = "EpochTimer"
|
|
||||||
// AlphabetTimer is a type for HandlerInfo structure.
|
// AlphabetTimer is a type for HandlerInfo structure.
|
||||||
AlphabetTimer = "AlphabetTimer"
|
AlphabetTimer = "AlphabetTimer"
|
||||||
)
|
)
|
||||||
|
@ -43,14 +40,12 @@ const (
|
||||||
func New(p *Params) *Timers {
|
func New(p *Params) *Timers {
|
||||||
return &Timers{
|
return &Timers{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
epoch: localTimer{duration: p.EpochDuration},
|
|
||||||
alphabet: localTimer{duration: p.AlphabetDuration},
|
alphabet: localTimer{duration: p.AlphabetDuration},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start runs all available local timers.
|
// Start runs all available local timers.
|
||||||
func (t *Timers) Start(ctx context.Context) {
|
func (t *Timers) Start(ctx context.Context) {
|
||||||
t.epoch.timer = time.NewTimer(t.epoch.duration)
|
|
||||||
t.alphabet.timer = time.NewTimer(t.alphabet.duration)
|
t.alphabet.timer = time.NewTimer(t.alphabet.duration)
|
||||||
go t.serve(ctx)
|
go t.serve(ctx)
|
||||||
}
|
}
|
||||||
|
@ -60,15 +55,9 @@ func (t *Timers) serve(ctx context.Context) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
t.log.Info("timers are getting stopped")
|
t.log.Info("timers are getting stopped")
|
||||||
t.epoch.timer.Stop()
|
|
||||||
t.alphabet.timer.Stop()
|
t.alphabet.timer.Stop()
|
||||||
|
|
||||||
return
|
return
|
||||||
case <-t.epoch.timer.C:
|
|
||||||
// reset timer so it can tick once again
|
|
||||||
t.epoch.timer.Reset(t.epoch.duration)
|
|
||||||
// call handler, it should be always set
|
|
||||||
t.epoch.handler(NewEpochTick{})
|
|
||||||
case <-t.alphabet.timer.C:
|
case <-t.alphabet.timer.C:
|
||||||
// reset timer so it can tick once again
|
// reset timer so it can tick once again
|
||||||
t.alphabet.timer.Reset(t.alphabet.duration)
|
t.alphabet.timer.Reset(t.alphabet.duration)
|
||||||
|
@ -85,8 +74,6 @@ func (t *Timers) RegisterHandler(h event.HandlerInfo) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
switch h.GetType() {
|
switch h.GetType() {
|
||||||
case EpochTimer:
|
|
||||||
t.epoch.handler = h.Handler()
|
|
||||||
case AlphabetTimer:
|
case AlphabetTimer:
|
||||||
t.alphabet.handler = h.Handler()
|
t.alphabet.handler = h.Handler()
|
||||||
default:
|
default:
|
||||||
|
|
11
pkg/innerring/util.go
Normal file
11
pkg/innerring/util.go
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
package innerring
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
|
||||||
|
)
|
||||||
|
|
||||||
|
type blockTimerWrapper timers.BlockTimer
|
||||||
|
|
||||||
|
func (t *blockTimerWrapper) ResetEpochTimer() error {
|
||||||
|
return (*timers.BlockTimer)(t).Reset()
|
||||||
|
}
|
Loading…
Reference in a new issue