frostfs-node/pkg/services/reputation/eigentrust/calculator/calls.go
Dmitrii Stepanov 7ebbfa3358 [#212] reputationsvc: Resolve linters and rename
Resolved containedctx linters.
Renamed context structs and interfaces to more understandble names.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-04-07 15:35:57 +00:00

287 lines
6 KiB
Go

package eigentrustcalc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/eigentrust"
apireputation "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/reputation"
"go.uber.org/zap"
)
type CalculatePrm struct {
last bool
ei eigentrust.EpochIteration
}
func (p *CalculatePrm) SetLast(last bool) {
p.last = last
}
func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) {
p.ei = ei
}
func (c *Calculator) Calculate(ctx context.Context, prm CalculatePrm) {
alpha, err := c.prm.AlphaProvider.EigenTrustAlpha()
if err != nil {
c.opts.log.Debug(
"failed to get alpha param",
zap.Error(err),
)
return
}
c.alpha = reputation.TrustValueFromFloat64(alpha)
c.beta = reputation.TrustValueFromFloat64(1 - alpha)
epochIteration := prm.ei
iter := epochIteration.I()
log := c.opts.log.With(
zap.Uint64("epoch", epochIteration.Epoch()),
zap.Uint32("iteration", iter),
)
if iter == 0 {
c.sendInitialValues(ctx, epochIteration)
return
}
// decrement iteration number to select the values collected
// on the previous stage
epochIteration.SetI(iter - 1)
consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(epochIteration)
if err != nil {
log.Debug("consumers trust iterator's init failure",
zap.String("error", err.Error()),
)
return
}
// continue with initial iteration number
epochIteration.SetI(iter)
err = consumersIter.Iterate(func(daughter apireputation.PeerID, iter TrustIterator) error {
err := c.prm.WorkerPool.Submit(func() {
c.iterateDaughter(ctx, iterDaughterPrm{
lastIter: prm.last,
ei: epochIteration,
id: daughter,
consumersIter: iter,
})
})
if err != nil {
log.Debug("worker pool submit failure",
zap.String("error", err.Error()),
)
}
// don't stop trying
return nil
})
if err != nil {
log.Debug("iterate daughter's consumers failed",
zap.String("error", err.Error()),
)
}
}
type iterDaughterPrm struct {
lastIter bool
ei EpochIterationInfo
id apireputation.PeerID
consumersIter TrustIterator
}
// nolint: funlen
func (c *Calculator) iterateDaughter(ctx context.Context, p iterDaughterPrm) {
initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id)
if err != nil {
c.opts.log.Debug("get initial trust failure",
zap.Stringer("daughter", p.id),
zap.String("error", err.Error()),
)
return
}
daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ei, p.id)
if err != nil {
c.opts.log.Debug("daughter trust iterator's init failure",
zap.String("error", err.Error()),
)
return
}
sum := reputation.TrustZero
err = p.consumersIter.Iterate(func(trust reputation.Trust) error {
if !p.lastIter {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
sum.Add(trust.Value())
return nil
})
if err != nil {
c.opts.log.Debug("iterate over daughter's trusts failure",
zap.String("error", err.Error()),
)
return
}
// Alpha * Pd
initTrust.Mul(c.alpha)
sum.Mul(c.beta)
sum.Add(initTrust)
var intermediateTrust eigentrust.IterationTrust
intermediateTrust.SetEpoch(p.ei.Epoch())
intermediateTrust.SetPeer(p.id)
intermediateTrust.SetI(p.ei.I())
if p.lastIter {
finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ei)
if err != nil {
c.opts.log.Debug("init writer failure",
zap.String("error", err.Error()),
)
return
}
intermediateTrust.SetValue(sum)
err = finalWriter.WriteIntermediateTrust(intermediateTrust)
if err != nil {
c.opts.log.Debug("write final result failure",
zap.String("error", err.Error()),
)
return
}
} else {
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ei)
if err != nil {
c.opts.log.Debug("init writer failure",
zap.String("error", err.Error()),
)
return
}
err = daughterIter.Iterate(func(trust reputation.Trust) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
val := trust.Value()
val.Mul(sum)
trust.SetValue(val)
err := intermediateWriter.Write(ctx, trust)
if err != nil {
c.opts.log.Debug("write value failure",
zap.String("error", err.Error()),
)
}
return nil
})
if err != nil {
c.opts.log.Debug("iterate daughter trusts failure",
zap.String("error", err.Error()),
)
}
err = intermediateWriter.Close(ctx)
if err != nil {
c.opts.log.Error(
"could not close writer",
zap.String("error", err.Error()),
)
}
}
}
func (c *Calculator) sendInitialValues(ctx context.Context, epochInfo EpochIterationInfo) {
daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(epochInfo)
if err != nil {
c.opts.log.Debug("all daughters trust iterator's init failure",
zap.String("error", err.Error()),
)
return
}
intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(epochInfo)
if err != nil {
c.opts.log.Debug("init writer failure",
zap.String("error", err.Error()),
)
return
}
err = daughterIter.Iterate(func(daughter apireputation.PeerID, iterator TrustIterator) error {
return iterator.Iterate(func(trust reputation.Trust) error {
trusted := trust.Peer()
initTrust, err := c.prm.InitialTrustSource.InitialTrust(trusted)
if err != nil {
c.opts.log.Debug("get initial trust failure",
zap.Stringer("peer", trusted),
zap.String("error", err.Error()),
)
// don't stop on single failure
return nil
}
initTrust.Mul(trust.Value())
trust.SetValue(initTrust)
err = intermediateWriter.Write(ctx, trust)
if err != nil {
c.opts.log.Debug("write value failure",
zap.String("error", err.Error()),
)
// don't stop on single failure
}
return nil
})
})
if err != nil {
c.opts.log.Debug("iterate over all daughters failure",
zap.String("error", err.Error()),
)
}
err = intermediateWriter.Close(ctx)
if err != nil {
c.opts.log.Debug("could not close writer",
zap.String("error", err.Error()),
)
}
}