forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
c236b54a65
Resolve funlen linter for Calculator.iterateDaughter method. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
294 lines
6.3 KiB
Go
294 lines
6.3 KiB
Go
package eigentrustcalc
|
|
|
|
import (
|
|
"context"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
|
|
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type CalculatePrm struct {
|
|
last bool
|
|
|
|
ei eigentrust.EpochIteration
|
|
}
|
|
|
|
func (p *CalculatePrm) SetLast(last bool) {
|
|
p.last = last
|
|
}
|
|
|
|
func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) {
|
|
p.ei = ei
|
|
}
|
|
|
|
func (c *Calculator) Calculate(ctx context.Context, prm CalculatePrm) {
|
|
alpha, err := c.prm.AlphaProvider.EigenTrustAlpha()
|
|
if err != nil {
|
|
c.opts.log.Debug(
|
|
"failed to get alpha param",
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
|
|
c.alpha = reputation.TrustValueFromFloat64(alpha)
|
|
c.beta = reputation.TrustValueFromFloat64(1 - alpha)
|
|
|
|
epochIteration := prm.ei
|
|
|
|
iter := epochIteration.I()
|
|
|
|
log := c.opts.log.With(
|
|
zap.Uint64("epoch", epochIteration.Epoch()),
|
|
zap.Uint32("iteration", iter),
|
|
)
|
|
|
|
if iter == 0 {
|
|
c.sendInitialValues(ctx, epochIteration)
|
|
return
|
|
}
|
|
|
|
// decrement iteration number to select the values collected
|
|
// on the previous stage
|
|
epochIteration.SetI(iter - 1)
|
|
|
|
consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(epochIteration)
|
|
if err != nil {
|
|
log.Debug("consumers trust iterator's init failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
// continue with initial iteration number
|
|
epochIteration.SetI(iter)
|
|
|
|
err = consumersIter.Iterate(func(daughter apireputation.PeerID, iter TrustIterator) error {
|
|
err := c.prm.WorkerPool.Submit(func() {
|
|
c.iterateDaughter(ctx, iterDaughterPrm{
|
|
lastIter: prm.last,
|
|
ei: epochIteration,
|
|
id: daughter,
|
|
consumersIter: iter,
|
|
})
|
|
})
|
|
if err != nil {
|
|
log.Debug("worker pool submit failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
// don't stop trying
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
log.Debug("iterate daughter's consumers failed",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
|
|
type iterDaughterPrm struct {
|
|
lastIter bool
|
|
|
|
ei EpochIterationInfo
|
|
|
|
id apireputation.PeerID
|
|
|
|
consumersIter TrustIterator
|
|
}
|
|
|
|
func (c *Calculator) iterateDaughter(ctx context.Context, p iterDaughterPrm) {
|
|
initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id)
|
|
if err != nil {
|
|
c.opts.log.Debug("get initial trust failure",
|
|
zap.Stringer("daughter", p.id),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ei, p.id)
|
|
if err != nil {
|
|
c.opts.log.Debug("daughter trust iterator's init failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
sum := reputation.TrustZero
|
|
|
|
err = p.consumersIter.Iterate(func(trust reputation.Trust) error {
|
|
if !p.lastIter {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
}
|
|
|
|
sum.Add(trust.Value())
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
c.opts.log.Debug("iterate over daughter's trusts failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
// Alpha * Pd
|
|
initTrust.Mul(c.alpha)
|
|
|
|
sum.Mul(c.beta)
|
|
sum.Add(initTrust)
|
|
|
|
var intermediateTrust eigentrust.IterationTrust
|
|
|
|
intermediateTrust.SetEpoch(p.ei.Epoch())
|
|
intermediateTrust.SetPeer(p.id)
|
|
intermediateTrust.SetI(p.ei.I())
|
|
|
|
if p.lastIter {
|
|
c.processLastIteration(p, intermediateTrust, sum)
|
|
} else {
|
|
c.processIntermediateIteration(ctx, p, daughterIter, sum)
|
|
}
|
|
}
|
|
|
|
func (c *Calculator) processLastIteration(p iterDaughterPrm, intermediateTrust eigentrust.IterationTrust, sum reputation.TrustValue) {
|
|
finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ei)
|
|
if err != nil {
|
|
c.opts.log.Debug("init writer failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
intermediateTrust.SetValue(sum)
|
|
|
|
err = finalWriter.WriteIntermediateTrust(intermediateTrust)
|
|
if err != nil {
|
|
c.opts.log.Debug("write final result failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
func (c *Calculator) processIntermediateIteration(ctx context.Context, p iterDaughterPrm, daughterIter TrustIterator, sum reputation.TrustValue) {
|
|
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ei)
|
|
if err != nil {
|
|
c.opts.log.Debug("init writer failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
err = daughterIter.Iterate(func(trust reputation.Trust) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
val := trust.Value()
|
|
val.Mul(sum)
|
|
|
|
trust.SetValue(val)
|
|
|
|
err := intermediateWriter.Write(ctx, trust)
|
|
if err != nil {
|
|
c.opts.log.Debug("write value failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
c.opts.log.Debug("iterate daughter trusts failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
err = intermediateWriter.Close(ctx)
|
|
if err != nil {
|
|
c.opts.log.Error(
|
|
"could not close writer",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
|
|
func (c *Calculator) sendInitialValues(ctx context.Context, epochInfo EpochIterationInfo) {
|
|
daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(epochInfo)
|
|
if err != nil {
|
|
c.opts.log.Debug("all daughters trust iterator's init failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(epochInfo)
|
|
if err != nil {
|
|
c.opts.log.Debug("init writer failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
err = daughterIter.Iterate(func(daughter apireputation.PeerID, iterator TrustIterator) error {
|
|
return iterator.Iterate(func(trust reputation.Trust) error {
|
|
trusted := trust.Peer()
|
|
|
|
initTrust, err := c.prm.InitialTrustSource.InitialTrust(trusted)
|
|
if err != nil {
|
|
c.opts.log.Debug("get initial trust failure",
|
|
zap.Stringer("peer", trusted),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
// don't stop on single failure
|
|
return nil
|
|
}
|
|
|
|
initTrust.Mul(trust.Value())
|
|
trust.SetValue(initTrust)
|
|
|
|
err = intermediateWriter.Write(ctx, trust)
|
|
if err != nil {
|
|
c.opts.log.Debug("write value failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
// don't stop on single failure
|
|
}
|
|
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
c.opts.log.Debug("iterate over all daughters failure",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
|
|
err = intermediateWriter.Close(ctx)
|
|
if err != nil {
|
|
c.opts.log.Debug("could not close writer",
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|