[#1210] reputation: Resolve race condition
Make all epoch independent in reputation process. Do not reset any timers related to reputation. Make it possible to finish iteration after the unexpected `NewEpoch` event. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
77d847dbea
commit
c3db12d71b
6 changed files with 66 additions and 115 deletions
|
@ -17,7 +17,8 @@ type iterContext struct {
|
|||
|
||||
eigentrust.EpochIteration
|
||||
|
||||
last bool
|
||||
iterationNumber uint32
|
||||
last bool
|
||||
}
|
||||
|
||||
func (x iterContext) Last() bool {
|
||||
|
@ -42,11 +43,20 @@ func (c *Controller) Continue(prm ContinuePrm) {
|
|||
|
||||
iterCtx.Context, iterCtx.cancel = context.WithCancel(context.Background())
|
||||
iterCtx.EpochIteration.SetEpoch(prm.Epoch)
|
||||
|
||||
iterations, err := c.prm.IterationsProvider.EigenTrustIterations()
|
||||
if err != nil {
|
||||
c.opts.log.Error("could not get EigenTrust iteration number",
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
iterCtx.iterationNumber = uint32(iterations)
|
||||
}
|
||||
} else {
|
||||
iterCtx.cancel()
|
||||
}
|
||||
|
||||
iterCtx.last = iterCtx.I() == c.iterationNumber-1
|
||||
iterCtx.last = iterCtx.I() == iterCtx.iterationNumber-1
|
||||
|
||||
err := c.prm.WorkerPool.Submit(func() {
|
||||
c.prm.DaughtersTrustCalculator.Calculate(iterCtx.iterContext)
|
||||
|
@ -66,16 +76,6 @@ func (c *Controller) Continue(prm ContinuePrm) {
|
|||
// number as already processed, but in any case it grows up
|
||||
// In this case and worker pool failure we can mark epoch
|
||||
delete(c.mCtx, prm.Epoch)
|
||||
|
||||
iterations, err := c.prm.IterationsProvider.EigenTrustIterations()
|
||||
if err != nil {
|
||||
c.opts.log.Debug(
|
||||
"could not get iteration numbers",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
} else {
|
||||
c.iterationNumber = uint32(iterations)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,9 +46,6 @@ type Controller struct {
|
|||
|
||||
opts *options
|
||||
|
||||
// Number of iterations
|
||||
iterationNumber uint32
|
||||
|
||||
mtx sync.Mutex
|
||||
mCtx map[uint64]*iterContextCancel
|
||||
}
|
||||
|
@ -75,11 +72,6 @@ func New(prm Prm, opts ...Option) *Controller {
|
|||
panicOnPrmValue("DaughtersTrustCalculator", prm.DaughtersTrustCalculator)
|
||||
}
|
||||
|
||||
iterations, err := prm.IterationsProvider.EigenTrustIterations()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("could not init EigenTrust controller: could not get num of iterations: %w", err))
|
||||
}
|
||||
|
||||
o := defaultOpts()
|
||||
|
||||
for _, opt := range opts {
|
||||
|
@ -87,9 +79,8 @@ func New(prm Prm, opts ...Option) *Controller {
|
|||
}
|
||||
|
||||
return &Controller{
|
||||
iterationNumber: uint32(iterations),
|
||||
prm: prm,
|
||||
opts: o,
|
||||
mCtx: make(map[uint64]*iterContextCancel),
|
||||
prm: prm,
|
||||
opts: o,
|
||||
mCtx: make(map[uint64]*iterContextCancel),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue