forked from TrueCloudLab/frostfs-node
[#479] cmd/neofs-node: Add eigen trust block timer
Eigen trust block timer ticks to start new round of eigen trust calculations. Every epoch this timer recalculates duration and starts again. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
376bb293b4
commit
66ddff3498
5 changed files with 120 additions and 0 deletions
|
@ -30,6 +30,7 @@ import (
|
|||
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/control"
|
||||
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
|
||||
|
@ -226,6 +227,9 @@ type cfgGRPC struct {
|
|||
|
||||
type cfgMorph struct {
|
||||
client *client.Client
|
||||
|
||||
blockTimers []*timer.BlockTimer // all combined timers
|
||||
eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations
|
||||
}
|
||||
|
||||
type cfgAccounting struct {
|
||||
|
|
|
@ -63,6 +63,7 @@ func bootUp(c *cfg) {
|
|||
serveGRPC(c)
|
||||
bootstrapNode(c)
|
||||
startWorkers(c)
|
||||
startBlockTimers(c)
|
||||
serveMetrics(c)
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
|
||||
|
@ -120,6 +121,11 @@ func listenMorphNotifications(c *cfg) {
|
|||
setNetmapNotificationParser(c, newEpochNotification, netmapEvent.ParseNewEpoch)
|
||||
registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers)
|
||||
registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers)
|
||||
|
||||
registerBlockHandler(lis, func(block *block.Block) {
|
||||
c.log.Debug("new block", zap.Uint32("index", block.Index))
|
||||
tickBlockTimers(c)
|
||||
})
|
||||
}
|
||||
|
||||
func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.Parser,
|
||||
|
@ -148,3 +154,7 @@ func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parse
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func registerBlockHandler(lis event.Listener, handler event.BlockHandler) {
|
||||
lis.RegisterBlockHandler(handler)
|
||||
}
|
||||
|
|
|
@ -337,6 +337,26 @@ func initReputationService(c *cfg) {
|
|||
),
|
||||
),
|
||||
)
|
||||
|
||||
// initialize eigen trust block timer
|
||||
durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper)
|
||||
|
||||
newEigenTrustIterTimer(c, durationMeter, func() {
|
||||
c.log.Debug("todo: start next EigenTrust iteration round")
|
||||
})
|
||||
|
||||
addNewEpochAsyncNotificationHandler(
|
||||
c,
|
||||
func(e event.Event) {
|
||||
durationMeter.Update() // recalculate duration of one iteration round
|
||||
|
||||
err := c.cfgMorph.eigenTrustTimer.Reset() // start iteration rounds again
|
||||
if err != nil {
|
||||
c.log.Warn("can't reset block timer to start eigen trust calculations again",
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
type reputationServer struct {
|
||||
|
|
85
cmd/neofs-node/timers.go
Normal file
85
cmd/neofs-node/timers.go
Normal file
|
@ -0,0 +1,85 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
|
||||
)
|
||||
|
||||
type (
|
||||
// EigenTrustDuration is a structure that provides duration of one
|
||||
// eigen trust iteration round in blocks for block timer.
|
||||
EigenTrustDuration struct {
|
||||
sync.Mutex
|
||||
|
||||
nm *wrapNetmap.Wrapper
|
||||
val uint32
|
||||
}
|
||||
)
|
||||
|
||||
// NewEigenTrustDuration returns instance of EigenTrustDuration.
|
||||
func NewEigenTrustDuration(nm *wrapNetmap.Wrapper) *EigenTrustDuration {
|
||||
return &EigenTrustDuration{
|
||||
nm: nm,
|
||||
}
|
||||
}
|
||||
|
||||
// Value returns number of blocks between two iterations of EigenTrust
|
||||
// calculation. This value is not changed between `Update` calls.
|
||||
func (e *EigenTrustDuration) Value() (uint32, error) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
if e.val == 0 {
|
||||
e.update()
|
||||
}
|
||||
|
||||
return e.val, nil
|
||||
}
|
||||
|
||||
// Update function recalculate duration of EigenTrust iteration based on
|
||||
// NeoFS epoch duration and amount of iteration rounds from global config.
|
||||
func (e *EigenTrustDuration) Update() {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
|
||||
e.update()
|
||||
}
|
||||
|
||||
func (e *EigenTrustDuration) update() {
|
||||
iterationAmount, err := e.nm.EigenTrustIterations()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
epochDuration, err := e.nm.EpochDuration()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
e.val = uint32(epochDuration / iterationAmount)
|
||||
}
|
||||
|
||||
func startBlockTimers(c *cfg) {
|
||||
for i := range c.cfgMorph.blockTimers {
|
||||
if err := c.cfgMorph.blockTimers[i].Reset(); err != nil {
|
||||
fatalOnErr(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tickBlockTimers(c *cfg) {
|
||||
for i := range c.cfgMorph.blockTimers {
|
||||
c.cfgMorph.blockTimers[i].Tick()
|
||||
}
|
||||
}
|
||||
|
||||
func newEigenTrustIterTimer(c *cfg, it *EigenTrustDuration, handler timer.BlockTickHandler) {
|
||||
c.cfgMorph.eigenTrustTimer = timer.NewBlockTimer(
|
||||
it.Value,
|
||||
handler,
|
||||
)
|
||||
|
||||
c.cfgMorph.blockTimers = append(c.cfgMorph.blockTimers, c.cfgMorph.eigenTrustTimer)
|
||||
}
|
Loading…
Reference in a new issue