diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 8e4983f460..8347cfbe0f 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -30,6 +30,7 @@ import ( nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "github.com/nspcc-dev/neofs-node/pkg/morph/timer" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/control" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" @@ -226,6 +227,9 @@ type cfgGRPC struct { type cfgMorph struct { client *client.Client + + blockTimers []*timer.BlockTimer // all combined timers + eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations } type cfgAccounting struct { diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 149fa90d05..d4969fa2cd 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -63,6 +63,7 @@ func bootUp(c *cfg) { serveGRPC(c) bootstrapNode(c) startWorkers(c) + startBlockTimers(c) serveMetrics(c) } diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index df78733ba5..4d51276224 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" @@ -120,6 +121,11 @@ func listenMorphNotifications(c *cfg) { setNetmapNotificationParser(c, newEpochNotification, netmapEvent.ParseNewEpoch) registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers) registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers) + + registerBlockHandler(lis, func(block *block.Block) { + c.log.Debug("new block", zap.Uint32("index", block.Index)) + tickBlockTimers(c) + }) } func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.Parser, @@ -148,3 +154,7 @@ func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parse } } } + +func registerBlockHandler(lis event.Listener, handler event.BlockHandler) { + lis.RegisterBlockHandler(handler) +} diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 24f1b91f6b..76675157ad 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -337,6 +337,26 @@ func initReputationService(c *cfg) { ), ), ) + + // initialize eigen trust block timer + durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper) + + newEigenTrustIterTimer(c, durationMeter, func() { + c.log.Debug("todo: start next EigenTrust iteration round") + }) + + addNewEpochAsyncNotificationHandler( + c, + func(e event.Event) { + durationMeter.Update() // recalculate duration of one iteration round + + err := c.cfgMorph.eigenTrustTimer.Reset() // start iteration rounds again + if err != nil { + c.log.Warn("can't reset block timer to start eigen trust calculations again", + zap.String("error", err.Error())) + } + }, + ) } type reputationServer struct { diff --git a/cmd/neofs-node/timers.go b/cmd/neofs-node/timers.go new file mode 100644 index 0000000000..692337aa64 --- /dev/null +++ b/cmd/neofs-node/timers.go @@ -0,0 +1,85 @@ +package main + +import ( + "sync" + + wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/morph/timer" +) + +type ( + // EigenTrustDuration is a structure that provides duration of one + // eigen trust iteration round in blocks for block timer. + EigenTrustDuration struct { + sync.Mutex + + nm *wrapNetmap.Wrapper + val uint32 + } +) + +// NewEigenTrustDuration returns instance of EigenTrustDuration. +func NewEigenTrustDuration(nm *wrapNetmap.Wrapper) *EigenTrustDuration { + return &EigenTrustDuration{ + nm: nm, + } +} + +// Value returns number of blocks between two iterations of EigenTrust +// calculation. This value is not changed between `Update` calls. +func (e *EigenTrustDuration) Value() (uint32, error) { + e.Lock() + defer e.Unlock() + + if e.val == 0 { + e.update() + } + + return e.val, nil +} + +// Update function recalculate duration of EigenTrust iteration based on +// NeoFS epoch duration and amount of iteration rounds from global config. +func (e *EigenTrustDuration) Update() { + e.Lock() + defer e.Unlock() + + e.update() +} + +func (e *EigenTrustDuration) update() { + iterationAmount, err := e.nm.EigenTrustIterations() + if err != nil { + return + } + + epochDuration, err := e.nm.EpochDuration() + if err != nil { + return + } + + e.val = uint32(epochDuration / iterationAmount) +} + +func startBlockTimers(c *cfg) { + for i := range c.cfgMorph.blockTimers { + if err := c.cfgMorph.blockTimers[i].Reset(); err != nil { + fatalOnErr(err) + } + } +} + +func tickBlockTimers(c *cfg) { + for i := range c.cfgMorph.blockTimers { + c.cfgMorph.blockTimers[i].Tick() + } +} + +func newEigenTrustIterTimer(c *cfg, it *EigenTrustDuration, handler timer.BlockTickHandler) { + c.cfgMorph.eigenTrustTimer = timer.NewBlockTimer( + it.Value, + handler, + ) + + c.cfgMorph.blockTimers = append(c.cfgMorph.blockTimers, c.cfgMorph.eigenTrustTimer) +}