From c3db12d71bbf87413aff6bbfc17b57918faeef90 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 2 Mar 2022 19:31:56 +0300 Subject: [PATCH] [#1210] reputation: Resolve race condition Make all epoch independent in reputation process. Do not reset any timers related to reputation. Make it possible to finish iteration after the unexpected `NewEpoch` event. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 4 +- cmd/neofs-node/main.go | 1 - cmd/neofs-node/reputation.go | 51 ++++++----- cmd/neofs-node/timers.go | 86 +++++-------------- .../reputation/eigentrust/controller/calls.go | 24 +++--- .../eigentrust/controller/controller.go | 15 +--- 6 files changed, 66 insertions(+), 115 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 30decf04..0f85c16f 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -31,7 +31,6 @@ import ( nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" - "github.com/nspcc-dev/neofs-node/pkg/morph/timer" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/control" @@ -134,8 +133,7 @@ type cfgMorph struct { disableCache bool - blockTimers []*timer.BlockTimer // all combined timers - eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations + eigenTrustTicker *eigenTrustTickers // timers for EigenTrust iterations proxyScriptHash neogoutil.Uint160 } diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 7accbf5c..baec1788 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -93,7 +93,6 @@ func bootUp(c *cfg) { makeAndWaitNotaryDeposit(c) bootstrapNode(c) startWorkers(c) - startBlockTimers(c) } func wait(c *cfg) { diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index cbb9cb96..e4fb4639 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/common" intermediatereputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local" + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/ticker" repClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" @@ -215,36 +216,40 @@ func initReputationService(c *cfg) { } // initialize eigen trust block timer - durationMeter := NewEigenTrustDuration(c.cfgNetmap.wrapper) - - newEigenTrustIterTimer(c, durationMeter, func() { - epoch, err := c.cfgNetmap.wrapper.Epoch() - if err != nil { - c.log.Debug( - "could not get current epoch", - zap.String("error", err.Error()), - ) - - return - } - - eigenTrustController.Continue( - eigentrustctrl.ContinuePrm{ - Epoch: epoch - 1, - }, - ) - }) + newEigenTrustIterTimer(c) addNewEpochAsyncNotificationHandler( c, func(e event.Event) { - durationMeter.Update() // recalculate duration of one iteration round + epoch := e.(netmap.NewEpoch).EpochNumber() - err := c.cfgMorph.eigenTrustTimer.Reset() // start iteration rounds again + log := c.log.With(zap.Uint64("epoch", epoch)) + + duration, err := c.cfgNetmap.wrapper.EpochDuration() if err != nil { - c.log.Warn("can't reset block timer to start eigen trust calculations again", - zap.String("error", err.Error())) + log.Debug("could not fetch epoch duration", zap.Error(err)) + return } + + iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations() + if err != nil { + log.Debug("could not fetch iteration number", zap.Error(err)) + return + } + + epochTimer, err := ticker.NewIterationsTicker(duration, iterations, func() { + eigenTrustController.Continue( + eigentrustctrl.ContinuePrm{ + Epoch: epoch - 1, + }, + ) + }) + if err != nil { + log.Debug("could not create fixed epoch timer", zap.Error(err)) + return + } + + c.cfgMorph.eigenTrustTicker.addEpochTimer(epoch, epochTimer) }, ) } diff --git a/cmd/neofs-node/timers.go b/cmd/neofs-node/timers.go index 8ab82bee..ae55d91f 100644 --- a/cmd/neofs-node/timers.go +++ b/cmd/neofs-node/timers.go @@ -3,83 +3,41 @@ package main import ( "sync" - nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap" - "github.com/nspcc-dev/neofs-node/pkg/morph/timer" + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/ticker" ) -type ( - // EigenTrustDuration is a structure that provides duration of one - // eigen trust iteration round in blocks for block timer. - EigenTrustDuration struct { - sync.Mutex +type eigenTrustTickers struct { + m sync.Mutex - nm *nmClient.Client - val uint32 - } -) - -// NewEigenTrustDuration returns instance of EigenTrustDuration. -func NewEigenTrustDuration(nm *nmClient.Client) *EigenTrustDuration { - return &EigenTrustDuration{ - nm: nm, - } + timers map[uint64]*ticker.IterationsTicker } -// Value returns number of blocks between two iterations of EigenTrust -// calculation. This value is not changed between `Update` calls. -func (e *EigenTrustDuration) Value() (uint32, error) { - e.Lock() - defer e.Unlock() +func (e *eigenTrustTickers) addEpochTimer(epoch uint64, timer *ticker.IterationsTicker) { + e.m.Lock() + defer e.m.Unlock() - if e.val == 0 { - e.update() - } - - return e.val, nil + e.timers[epoch] = timer } -// Update function recalculate duration of EigenTrust iteration based on -// NeoFS epoch duration and amount of iteration rounds from global config. -func (e *EigenTrustDuration) Update() { - e.Lock() - defer e.Unlock() +func (e *eigenTrustTickers) tick() { + e.m.Lock() + defer e.m.Unlock() - e.update() -} - -func (e *EigenTrustDuration) update() { - iterationAmount, err := e.nm.EigenTrustIterations() - if err != nil { - return - } - - epochDuration, err := e.nm.EpochDuration() - if err != nil { - return - } - - e.val = uint32(epochDuration / iterationAmount) -} - -func startBlockTimers(c *cfg) { - for i := range c.cfgMorph.blockTimers { - if err := c.cfgMorph.blockTimers[i].Reset(); err != nil { - fatalOnErr(err) + for epoch, t := range e.timers { + if !t.Tick() { + delete(e.timers, epoch) } } } func tickBlockTimers(c *cfg) { - for i := range c.cfgMorph.blockTimers { - c.cfgMorph.blockTimers[i].Tick() + c.cfgMorph.eigenTrustTicker.tick() +} + +func newEigenTrustIterTimer(c *cfg) { + c.cfgMorph.eigenTrustTicker = &eigenTrustTickers{ + // it is expected to have max 2 concurrent epoch + // in normal mode work + timers: make(map[uint64]*ticker.IterationsTicker, 2), } } - -func newEigenTrustIterTimer(c *cfg, it *EigenTrustDuration, handler timer.BlockTickHandler) { - c.cfgMorph.eigenTrustTimer = timer.NewBlockTimer( - it.Value, - handler, - ) - - c.cfgMorph.blockTimers = append(c.cfgMorph.blockTimers, c.cfgMorph.eigenTrustTimer) -} diff --git a/pkg/services/reputation/eigentrust/controller/calls.go b/pkg/services/reputation/eigentrust/controller/calls.go index 27657ebf..41881486 100644 --- a/pkg/services/reputation/eigentrust/controller/calls.go +++ b/pkg/services/reputation/eigentrust/controller/calls.go @@ -17,7 +17,8 @@ type iterContext struct { eigentrust.EpochIteration - last bool + iterationNumber uint32 + last bool } func (x iterContext) Last() bool { @@ -42,11 +43,20 @@ func (c *Controller) Continue(prm ContinuePrm) { iterCtx.Context, iterCtx.cancel = context.WithCancel(context.Background()) iterCtx.EpochIteration.SetEpoch(prm.Epoch) + + iterations, err := c.prm.IterationsProvider.EigenTrustIterations() + if err != nil { + c.opts.log.Error("could not get EigenTrust iteration number", + zap.Error(err), + ) + } else { + iterCtx.iterationNumber = uint32(iterations) + } } else { iterCtx.cancel() } - iterCtx.last = iterCtx.I() == c.iterationNumber-1 + iterCtx.last = iterCtx.I() == iterCtx.iterationNumber-1 err := c.prm.WorkerPool.Submit(func() { c.prm.DaughtersTrustCalculator.Calculate(iterCtx.iterContext) @@ -66,16 +76,6 @@ func (c *Controller) Continue(prm ContinuePrm) { // number as already processed, but in any case it grows up // In this case and worker pool failure we can mark epoch delete(c.mCtx, prm.Epoch) - - iterations, err := c.prm.IterationsProvider.EigenTrustIterations() - if err != nil { - c.opts.log.Debug( - "could not get iteration numbers", - zap.String("error", err.Error()), - ) - } else { - c.iterationNumber = uint32(iterations) - } } } diff --git a/pkg/services/reputation/eigentrust/controller/controller.go b/pkg/services/reputation/eigentrust/controller/controller.go index 8c1f2d2b..956286fa 100644 --- a/pkg/services/reputation/eigentrust/controller/controller.go +++ b/pkg/services/reputation/eigentrust/controller/controller.go @@ -46,9 +46,6 @@ type Controller struct { opts *options - // Number of iterations - iterationNumber uint32 - mtx sync.Mutex mCtx map[uint64]*iterContextCancel } @@ -75,11 +72,6 @@ func New(prm Prm, opts ...Option) *Controller { panicOnPrmValue("DaughtersTrustCalculator", prm.DaughtersTrustCalculator) } - iterations, err := prm.IterationsProvider.EigenTrustIterations() - if err != nil { - panic(fmt.Errorf("could not init EigenTrust controller: could not get num of iterations: %w", err)) - } - o := defaultOpts() for _, opt := range opts { @@ -87,9 +79,8 @@ func New(prm Prm, opts ...Option) *Controller { } return &Controller{ - iterationNumber: uint32(iterations), - prm: prm, - opts: o, - mCtx: make(map[uint64]*iterContextCancel), + prm: prm, + opts: o, + mCtx: make(map[uint64]*iterContextCancel), } }