forked from TrueCloudLab/frostfs-node
[#473] Implement EigenTrust calculations
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
0ec8bcf6b4
commit
a97e08cfd7
14 changed files with 1112 additions and 3 deletions
91
pkg/services/reputation/eigentrust/calculator/calculator.go
Normal file
91
pkg/services/reputation/eigentrust/calculator/calculator.go
Normal file
|
@ -0,0 +1,91 @@
|
|||
package eigentrustcalc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||
)
|
||||
|
||||
// Prm groups the required parameters of the Calculator's constructor.
|
||||
//
|
||||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {
|
||||
// Alpha parameter from origin EigenTrust algortihm
|
||||
// http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1.
|
||||
//
|
||||
// Must be in range (0, 1).
|
||||
Alpha float64
|
||||
|
||||
// Source of initial node trust values
|
||||
//
|
||||
// Must not be nil.
|
||||
InitialTrustSource InitialTrustSource
|
||||
|
||||
DaughterTrustSource DaughterTrustIteratorProvider
|
||||
|
||||
IntermediateValueTarget IntermediateWriterProvider
|
||||
|
||||
FinalResultTarget IntermediateWriterProvider
|
||||
|
||||
WorkerPool util.WorkerPool
|
||||
}
|
||||
|
||||
// Calculator is a processor of a single iteration of EigenTrust algorithm.
|
||||
//
|
||||
// For correct operation, the Calculator must be created
|
||||
// using the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// the Calculator is immediately ready to work through
|
||||
// API of external control of calculations and data transfer.
|
||||
type Calculator struct {
|
||||
alpha, beta reputation.TrustValue // beta = 1 - alpha
|
||||
|
||||
prm Prm
|
||||
|
||||
opts *options
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func panicOnPrmValue(n string, v interface{}) {
|
||||
panic(fmt.Sprintf(invalidPrmValFmt, n, v, v))
|
||||
}
|
||||
|
||||
// New creates a new instance of the Calculator.
|
||||
//
|
||||
// Panics if at least one value of the parameters is invalid.
|
||||
//
|
||||
// The created Calculator does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(prm Prm, opts ...Option) *Calculator {
|
||||
switch {
|
||||
case prm.Alpha <= 0 || prm.Alpha >= 1:
|
||||
panicOnPrmValue("Alpha", prm.Alpha)
|
||||
case prm.InitialTrustSource == nil:
|
||||
panicOnPrmValue("InitialTrustSource", prm.InitialTrustSource)
|
||||
case prm.DaughterTrustSource == nil:
|
||||
panicOnPrmValue("DaughterTrustSource", prm.DaughterTrustSource)
|
||||
case prm.IntermediateValueTarget == nil:
|
||||
panicOnPrmValue("IntermediateValueTarget", prm.IntermediateValueTarget)
|
||||
case prm.FinalResultTarget == nil:
|
||||
panicOnPrmValue("FinalResultTarget", prm.FinalResultTarget)
|
||||
case prm.WorkerPool == nil:
|
||||
panicOnPrmValue("WorkerPool", prm.WorkerPool)
|
||||
}
|
||||
|
||||
o := defaultOpts()
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
return &Calculator{
|
||||
alpha: reputation.TrustValueFromFloat64(prm.Alpha),
|
||||
beta: reputation.TrustValueFromFloat64(1 - prm.Alpha),
|
||||
prm: prm,
|
||||
opts: o,
|
||||
}
|
||||
}
|
270
pkg/services/reputation/eigentrust/calculator/calls.go
Normal file
270
pkg/services/reputation/eigentrust/calculator/calls.go
Normal file
|
@ -0,0 +1,270 @@
|
|||
package eigentrustcalc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type CalculatePrm struct {
|
||||
last bool
|
||||
|
||||
ei eigentrust.EpochIteration
|
||||
}
|
||||
|
||||
func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) {
|
||||
p.ei = ei
|
||||
}
|
||||
|
||||
type iterContext struct {
|
||||
context.Context
|
||||
eigentrust.EpochIteration
|
||||
}
|
||||
|
||||
func (c *Calculator) Calculate(prm CalculatePrm) {
|
||||
ctx := iterContext{
|
||||
Context: context.Background(),
|
||||
EpochIteration: prm.ei,
|
||||
}
|
||||
|
||||
iter := ctx.I()
|
||||
|
||||
if iter == 0 {
|
||||
c.sendInitialValues(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
// decrement iteration number to select the values collected
|
||||
// on the previous stage
|
||||
ctx.SetI(iter - 1)
|
||||
|
||||
consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(ctx)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("consumers trust iterator's init failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// continue with initial iteration number
|
||||
ctx.SetI(iter)
|
||||
|
||||
err = consumersIter.Iterate(func(daughter reputation.PeerID, iter TrustIterator) error {
|
||||
err := c.prm.WorkerPool.Submit(func() {
|
||||
c.iterateDaughter(iterDaughterPrm{
|
||||
lastIter: prm.last,
|
||||
ctx: ctx,
|
||||
id: daughter,
|
||||
consumersIter: iter,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
c.opts.log.Debug("worker pool submit failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
// don't stop trying
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
c.opts.log.Debug("iterate daughters failed",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
type iterDaughterPrm struct {
|
||||
lastIter bool
|
||||
|
||||
ctx Context
|
||||
|
||||
id reputation.PeerID
|
||||
|
||||
consumersIter TrustIterator
|
||||
}
|
||||
|
||||
func (c *Calculator) iterateDaughter(p iterDaughterPrm) {
|
||||
initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("get initial trust failure",
|
||||
zap.String("daughter", hex.EncodeToString(p.id.Bytes())),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ctx, p.id)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("daughter trust iterator's init failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
sum := reputation.TrustZero
|
||||
|
||||
err = p.consumersIter.Iterate(func(trust reputation.Trust) error {
|
||||
if !p.lastIter {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return p.ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
sum.Add(trust.Value())
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
c.opts.log.Debug("iterate over daughter's trusts failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Alpha * Pd
|
||||
initTrust.Mul(c.alpha)
|
||||
|
||||
sum.Mul(c.beta)
|
||||
sum.Add(initTrust)
|
||||
|
||||
var intermediateTrust eigentrust.IterationTrust
|
||||
|
||||
intermediateTrust.SetEpoch(p.ctx.Epoch())
|
||||
intermediateTrust.SetTrustingPeer(p.id)
|
||||
intermediateTrust.SetI(p.ctx.I())
|
||||
|
||||
if p.lastIter {
|
||||
finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ctx)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("init intermediate writer failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
intermediateTrust.SetValue(sum)
|
||||
|
||||
err = finalWriter.WriteIntermediateTrust(intermediateTrust)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("write final result failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
} else {
|
||||
intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(p.ctx)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("init intermediate writer failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
err = daughterIter.Iterate(func(trust reputation.Trust) error {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
return p.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
val := trust.Value()
|
||||
val.Mul(sum)
|
||||
|
||||
trust.SetValue(val)
|
||||
|
||||
intermediateTrust.SetPeer(trust.Peer())
|
||||
intermediateTrust.SetValue(val)
|
||||
|
||||
err := intermediateWriter.WriteIntermediateTrust(intermediateTrust)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("write intermediate value failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
c.opts.log.Debug("iterate daughter trusts failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Calculator) sendInitialValues(ctx Context) {
|
||||
daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(ctx)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("all daughters trust iterator's init failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(ctx)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("init intermediate writer failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var intermediateTrust eigentrust.IterationTrust
|
||||
|
||||
intermediateTrust.SetEpoch(ctx.Epoch())
|
||||
intermediateTrust.SetI(ctx.I())
|
||||
|
||||
err = daughterIter.Iterate(func(daughter reputation.PeerID, iterator TrustIterator) error {
|
||||
intermediateTrust.SetTrustingPeer(daughter)
|
||||
|
||||
return iterator.Iterate(func(trust reputation.Trust) error {
|
||||
trusted := trust.Peer()
|
||||
|
||||
initTrust, err := c.prm.InitialTrustSource.InitialTrust(trusted)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("get initial trust failure",
|
||||
zap.String("peer", hex.EncodeToString(trusted.Bytes())),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
// don't stop on single failure
|
||||
return nil
|
||||
}
|
||||
|
||||
intermediateTrust.SetPeer(trusted)
|
||||
|
||||
initTrust.Mul(trust.Value())
|
||||
intermediateTrust.SetValue(initTrust)
|
||||
|
||||
err = intermediateWriter.WriteIntermediateTrust(intermediateTrust)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("write intermediate value failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
// don't stop on single failure
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
c.opts.log.Debug("iterate over all daughters failure",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
47
pkg/services/reputation/eigentrust/calculator/deps.go
Normal file
47
pkg/services/reputation/eigentrust/calculator/deps.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package eigentrustcalc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust"
|
||||
)
|
||||
|
||||
type Context interface {
|
||||
context.Context
|
||||
|
||||
// Must return epoch number to select the values
|
||||
// for global trust calculation.
|
||||
Epoch() uint64
|
||||
|
||||
// Must return the sequence number of the iteration.
|
||||
I() uint32
|
||||
}
|
||||
|
||||
type InitialTrustSource interface {
|
||||
InitialTrust(reputation.PeerID) (reputation.TrustValue, error)
|
||||
}
|
||||
|
||||
type TrustIterator interface {
|
||||
Iterate(reputation.TrustHandler) error
|
||||
}
|
||||
|
||||
type PeerTrustsHandler func(reputation.PeerID, TrustIterator) error
|
||||
|
||||
type PeerTrustsIterator interface {
|
||||
Iterate(PeerTrustsHandler) error
|
||||
}
|
||||
|
||||
type DaughterTrustIteratorProvider interface {
|
||||
InitDaughterIterator(Context, reputation.PeerID) (TrustIterator, error)
|
||||
InitAllDaughtersIterator(Context) (PeerTrustsIterator, error)
|
||||
InitConsumersIterator(Context) (PeerTrustsIterator, error)
|
||||
}
|
||||
|
||||
type IntermediateWriter interface {
|
||||
WriteIntermediateTrust(eigentrust.IterationTrust) error
|
||||
}
|
||||
|
||||
type IntermediateWriterProvider interface {
|
||||
InitIntermediateWriter(Context) (IntermediateWriter, error)
|
||||
}
|
30
pkg/services/reputation/eigentrust/calculator/opts.go
Normal file
30
pkg/services/reputation/eigentrust/calculator/opts.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package eigentrustcalc
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option sets an optional parameter of Controller.
|
||||
type Option func(*options)
|
||||
|
||||
type options struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
func defaultOpts() *options {
|
||||
return &options{
|
||||
log: zap.L(),
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify logging component.
|
||||
//
|
||||
// Ignores nil values.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(o *options) {
|
||||
if l != nil {
|
||||
o.log = l
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue