From a97e08cfd7e6eed57cc59e9a23837379703fa45f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Sat, 10 Apr 2021 15:02:51 +0300 Subject: [PATCH] [#473] Implement EigenTrust calculations Signed-off-by: Leonard Lyubich --- .../eigentrust/calculator/calculator.go | 91 ++++++ .../reputation/eigentrust/calculator/calls.go | 270 ++++++++++++++++++ .../reputation/eigentrust/calculator/deps.go | 47 +++ .../reputation/eigentrust/calculator/opts.go | 30 ++ .../reputation/eigentrust/controller/calls.go | 69 +++++ .../eigentrust/controller/controller.go | 85 ++++++ .../reputation/eigentrust/controller/deps.go | 35 +++ .../reputation/eigentrust/controller/opts.go | 30 ++ .../reputation/eigentrust/iteration.go | 31 ++ .../eigentrust/storage/consumers/calls.go | 189 ++++++++++++ .../eigentrust/storage/consumers/storage.go | 40 +++ .../eigentrust/storage/daughters/calls.go | 134 +++++++++ .../eigentrust/storage/daughters/storage.go | 38 +++ pkg/services/reputation/trust.go | 26 +- 14 files changed, 1112 insertions(+), 3 deletions(-) create mode 100644 pkg/services/reputation/eigentrust/calculator/calculator.go create mode 100644 pkg/services/reputation/eigentrust/calculator/calls.go create mode 100644 pkg/services/reputation/eigentrust/calculator/deps.go create mode 100644 pkg/services/reputation/eigentrust/calculator/opts.go create mode 100644 pkg/services/reputation/eigentrust/controller/calls.go create mode 100644 pkg/services/reputation/eigentrust/controller/controller.go create mode 100644 pkg/services/reputation/eigentrust/controller/deps.go create mode 100644 pkg/services/reputation/eigentrust/controller/opts.go create mode 100644 pkg/services/reputation/eigentrust/iteration.go create mode 100644 pkg/services/reputation/eigentrust/storage/consumers/calls.go create mode 100644 pkg/services/reputation/eigentrust/storage/consumers/storage.go create mode 100644 pkg/services/reputation/eigentrust/storage/daughters/calls.go create mode 100644 pkg/services/reputation/eigentrust/storage/daughters/storage.go diff --git a/pkg/services/reputation/eigentrust/calculator/calculator.go b/pkg/services/reputation/eigentrust/calculator/calculator.go new file mode 100644 index 000000000..5c0232a04 --- /dev/null +++ b/pkg/services/reputation/eigentrust/calculator/calculator.go @@ -0,0 +1,91 @@ +package eigentrustcalc + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/util" +) + +// Prm groups the required parameters of the Calculator's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Alpha parameter from origin EigenTrust algortihm + // http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1. + // + // Must be in range (0, 1). + Alpha float64 + + // Source of initial node trust values + // + // Must not be nil. + InitialTrustSource InitialTrustSource + + DaughterTrustSource DaughterTrustIteratorProvider + + IntermediateValueTarget IntermediateWriterProvider + + FinalResultTarget IntermediateWriterProvider + + WorkerPool util.WorkerPool +} + +// Calculator is a processor of a single iteration of EigenTrust algorithm. +// +// For correct operation, the Calculator must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the Calculator is immediately ready to work through +// API of external control of calculations and data transfer. +type Calculator struct { + alpha, beta reputation.TrustValue // beta = 1 - alpha + + prm Prm + + opts *options +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Calculator. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Calculator does not require additional +// initialization and is completely ready for work. +func New(prm Prm, opts ...Option) *Calculator { + switch { + case prm.Alpha <= 0 || prm.Alpha >= 1: + panicOnPrmValue("Alpha", prm.Alpha) + case prm.InitialTrustSource == nil: + panicOnPrmValue("InitialTrustSource", prm.InitialTrustSource) + case prm.DaughterTrustSource == nil: + panicOnPrmValue("DaughterTrustSource", prm.DaughterTrustSource) + case prm.IntermediateValueTarget == nil: + panicOnPrmValue("IntermediateValueTarget", prm.IntermediateValueTarget) + case prm.FinalResultTarget == nil: + panicOnPrmValue("FinalResultTarget", prm.FinalResultTarget) + case prm.WorkerPool == nil: + panicOnPrmValue("WorkerPool", prm.WorkerPool) + } + + o := defaultOpts() + + for _, opt := range opts { + opt(o) + } + + return &Calculator{ + alpha: reputation.TrustValueFromFloat64(prm.Alpha), + beta: reputation.TrustValueFromFloat64(1 - prm.Alpha), + prm: prm, + opts: o, + } +} diff --git a/pkg/services/reputation/eigentrust/calculator/calls.go b/pkg/services/reputation/eigentrust/calculator/calls.go new file mode 100644 index 000000000..c10a73a87 --- /dev/null +++ b/pkg/services/reputation/eigentrust/calculator/calls.go @@ -0,0 +1,270 @@ +package eigentrustcalc + +import ( + "context" + "encoding/hex" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" + "go.uber.org/zap" +) + +type CalculatePrm struct { + last bool + + ei eigentrust.EpochIteration +} + +func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) { + p.ei = ei +} + +type iterContext struct { + context.Context + eigentrust.EpochIteration +} + +func (c *Calculator) Calculate(prm CalculatePrm) { + ctx := iterContext{ + Context: context.Background(), + EpochIteration: prm.ei, + } + + iter := ctx.I() + + if iter == 0 { + c.sendInitialValues(ctx) + return + } + + // decrement iteration number to select the values collected + // on the previous stage + ctx.SetI(iter - 1) + + consumersIter, err := c.prm.DaughterTrustSource.InitConsumersIterator(ctx) + if err != nil { + c.opts.log.Debug("consumers trust iterator's init failure", + zap.String("error", err.Error()), + ) + + return + } + + // continue with initial iteration number + ctx.SetI(iter) + + err = consumersIter.Iterate(func(daughter reputation.PeerID, iter TrustIterator) error { + err := c.prm.WorkerPool.Submit(func() { + c.iterateDaughter(iterDaughterPrm{ + lastIter: prm.last, + ctx: ctx, + id: daughter, + consumersIter: iter, + }) + }) + if err != nil { + c.opts.log.Debug("worker pool submit failure", + zap.String("error", err.Error()), + ) + } + + // don't stop trying + return nil + }) + if err != nil { + c.opts.log.Debug("iterate daughters failed", + zap.String("error", err.Error()), + ) + } +} + +type iterDaughterPrm struct { + lastIter bool + + ctx Context + + id reputation.PeerID + + consumersIter TrustIterator +} + +func (c *Calculator) iterateDaughter(p iterDaughterPrm) { + initTrust, err := c.prm.InitialTrustSource.InitialTrust(p.id) + if err != nil { + c.opts.log.Debug("get initial trust failure", + zap.String("daughter", hex.EncodeToString(p.id.Bytes())), + zap.String("error", err.Error()), + ) + + return + } + + daughterIter, err := c.prm.DaughterTrustSource.InitDaughterIterator(p.ctx, p.id) + if err != nil { + c.opts.log.Debug("daughter trust iterator's init failure", + zap.String("error", err.Error()), + ) + + return + } + + sum := reputation.TrustZero + + err = p.consumersIter.Iterate(func(trust reputation.Trust) error { + if !p.lastIter { + select { + case <-p.ctx.Done(): + return p.ctx.Err() + default: + } + } + + sum.Add(trust.Value()) + return nil + }) + if err != nil { + c.opts.log.Debug("iterate over daughter's trusts failure", + zap.String("error", err.Error()), + ) + + return + } + + // Alpha * Pd + initTrust.Mul(c.alpha) + + sum.Mul(c.beta) + sum.Add(initTrust) + + var intermediateTrust eigentrust.IterationTrust + + intermediateTrust.SetEpoch(p.ctx.Epoch()) + intermediateTrust.SetTrustingPeer(p.id) + intermediateTrust.SetI(p.ctx.I()) + + if p.lastIter { + finalWriter, err := c.prm.FinalResultTarget.InitIntermediateWriter(p.ctx) + if err != nil { + c.opts.log.Debug("init intermediate writer failure", + zap.String("error", err.Error()), + ) + + return + } + + intermediateTrust.SetValue(sum) + + err = finalWriter.WriteIntermediateTrust(intermediateTrust) + if err != nil { + c.opts.log.Debug("write final result failure", + zap.String("error", err.Error()), + ) + + return + } + } else { + intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(p.ctx) + if err != nil { + c.opts.log.Debug("init intermediate writer failure", + zap.String("error", err.Error()), + ) + + return + } + + err = daughterIter.Iterate(func(trust reputation.Trust) error { + select { + case <-p.ctx.Done(): + return p.ctx.Err() + default: + } + + val := trust.Value() + val.Mul(sum) + + trust.SetValue(val) + + intermediateTrust.SetPeer(trust.Peer()) + intermediateTrust.SetValue(val) + + err := intermediateWriter.WriteIntermediateTrust(intermediateTrust) + if err != nil { + c.opts.log.Debug("write intermediate value failure", + zap.String("error", err.Error()), + ) + } + + return nil + }) + if err != nil { + c.opts.log.Debug("iterate daughter trusts failure", + zap.String("error", err.Error()), + ) + } + } +} + +func (c *Calculator) sendInitialValues(ctx Context) { + daughterIter, err := c.prm.DaughterTrustSource.InitAllDaughtersIterator(ctx) + if err != nil { + c.opts.log.Debug("all daughters trust iterator's init failure", + zap.String("error", err.Error()), + ) + + return + } + + intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(ctx) + if err != nil { + c.opts.log.Debug("init intermediate writer failure", + zap.String("error", err.Error()), + ) + + return + } + + var intermediateTrust eigentrust.IterationTrust + + intermediateTrust.SetEpoch(ctx.Epoch()) + intermediateTrust.SetI(ctx.I()) + + err = daughterIter.Iterate(func(daughter reputation.PeerID, iterator TrustIterator) error { + intermediateTrust.SetTrustingPeer(daughter) + + return iterator.Iterate(func(trust reputation.Trust) error { + trusted := trust.Peer() + + initTrust, err := c.prm.InitialTrustSource.InitialTrust(trusted) + if err != nil { + c.opts.log.Debug("get initial trust failure", + zap.String("peer", hex.EncodeToString(trusted.Bytes())), + zap.String("error", err.Error()), + ) + + // don't stop on single failure + return nil + } + + intermediateTrust.SetPeer(trusted) + + initTrust.Mul(trust.Value()) + intermediateTrust.SetValue(initTrust) + + err = intermediateWriter.WriteIntermediateTrust(intermediateTrust) + if err != nil { + c.opts.log.Debug("write intermediate value failure", + zap.String("error", err.Error()), + ) + + // don't stop on single failure + } + + return nil + }) + }) + if err != nil { + c.opts.log.Debug("iterate over all daughters failure", + zap.String("error", err.Error()), + ) + } +} diff --git a/pkg/services/reputation/eigentrust/calculator/deps.go b/pkg/services/reputation/eigentrust/calculator/deps.go new file mode 100644 index 000000000..b52151a6a --- /dev/null +++ b/pkg/services/reputation/eigentrust/calculator/deps.go @@ -0,0 +1,47 @@ +package eigentrustcalc + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" +) + +type Context interface { + context.Context + + // Must return epoch number to select the values + // for global trust calculation. + Epoch() uint64 + + // Must return the sequence number of the iteration. + I() uint32 +} + +type InitialTrustSource interface { + InitialTrust(reputation.PeerID) (reputation.TrustValue, error) +} + +type TrustIterator interface { + Iterate(reputation.TrustHandler) error +} + +type PeerTrustsHandler func(reputation.PeerID, TrustIterator) error + +type PeerTrustsIterator interface { + Iterate(PeerTrustsHandler) error +} + +type DaughterTrustIteratorProvider interface { + InitDaughterIterator(Context, reputation.PeerID) (TrustIterator, error) + InitAllDaughtersIterator(Context) (PeerTrustsIterator, error) + InitConsumersIterator(Context) (PeerTrustsIterator, error) +} + +type IntermediateWriter interface { + WriteIntermediateTrust(eigentrust.IterationTrust) error +} + +type IntermediateWriterProvider interface { + InitIntermediateWriter(Context) (IntermediateWriter, error) +} diff --git a/pkg/services/reputation/eigentrust/calculator/opts.go b/pkg/services/reputation/eigentrust/calculator/opts.go new file mode 100644 index 000000000..27577b037 --- /dev/null +++ b/pkg/services/reputation/eigentrust/calculator/opts.go @@ -0,0 +1,30 @@ +package eigentrustcalc + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Controller. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns option to specify logging component. +// +// Ignores nil values. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/reputation/eigentrust/controller/calls.go b/pkg/services/reputation/eigentrust/controller/calls.go new file mode 100644 index 000000000..fbef675f5 --- /dev/null +++ b/pkg/services/reputation/eigentrust/controller/calls.go @@ -0,0 +1,69 @@ +package eigentrustctrl + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" + "go.uber.org/zap" +) + +// ContinuePrm groups the required parameters of Continue operation. +type ContinuePrm struct { + epoch uint64 +} + +type iterContext struct { + context.Context + + eigentrust.EpochIteration + + last bool +} + +func (x iterContext) Last() bool { + return x.last +} + +type iterContextCancel struct { + iterContext + + cancel context.CancelFunc +} + +// Continue moves the global reputation calculator to the next iteration. +func (c *Controller) Continue(prm ContinuePrm) { + c.mtx.Lock() + + { + iterCtx, ok := c.mCtx[prm.epoch] + if !ok { + iterCtx := new(iterContextCancel) + c.mCtx[prm.epoch] = iterCtx + + iterCtx.Context, iterCtx.cancel = context.WithCancel(context.Background()) + } else { + iterCtx.cancel() + } + + iterCtx.last = iterCtx.I() == c.prm.IterationNumber + + err := c.prm.WorkerPool.Submit(func() { + c.prm.DaughtersTrustCalculator.Calculate(iterCtx.iterContext) + }) + if err != nil { + c.opts.log.Debug("iteration submit failure", + zap.String("error", err.Error()), + ) + } + + if iterCtx.last { + delete(c.mCtx, prm.epoch) + // In this case and worker pool failure we can mark epoch + // number as already processed, but in any case it grows up + // during normal operation of the system. Also, such information + // will only live while the application is alive. + } + } + + c.mtx.Unlock() +} diff --git a/pkg/services/reputation/eigentrust/controller/controller.go b/pkg/services/reputation/eigentrust/controller/controller.go new file mode 100644 index 000000000..9313af586 --- /dev/null +++ b/pkg/services/reputation/eigentrust/controller/controller.go @@ -0,0 +1,85 @@ +package eigentrustctrl + +import ( + "fmt" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/util" +) + +// Prm groups the required parameters of the Controller's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // Number of iterations + IterationNumber uint32 + + // Component of computing iteration of EigenTrust algorithm. + // + // Must not be nil. + DaughtersTrustCalculator DaughtersTrustCalculator + + // Routine execution pool for single epoch iteration. + WorkerPool util.WorkerPool +} + +// Controller represents EigenTrust algorithm transient controller. +// +// Controller's main goal is to separate the two main stages of +// the calculation: +// 1.reporting local values to manager nodes +// 2.calculating global trusts of child nodes +// +// Calculation stages are controlled based on external signals +// that come from the application through the Controller's API. +// +// For correct operation, the controller must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// the constructor is immediately ready to work through +// API of external control of calculations and data transfer. +type Controller struct { + prm Prm + + opts *options + + mtx sync.Mutex + mCtx map[uint64]*iterContextCancel +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +} + +// New creates a new instance of the Controller. +// +// Panics if at least one value of the parameters is invalid. +// +// The created Controller does not require additional +// initialization and is completely ready for work. +func New(prm Prm, opts ...Option) *Controller { + switch { + case prm.IterationNumber == 0: + panicOnPrmValue("IterationNumber", prm.IterationNumber) + case prm.WorkerPool == nil: + panicOnPrmValue("WorkerPool", prm.WorkerPool) + case prm.DaughtersTrustCalculator == nil: + panicOnPrmValue("DaughtersTrustCalculator", prm.DaughtersTrustCalculator) + } + + o := defaultOpts() + + for _, opt := range opts { + opt(o) + } + + return &Controller{ + prm: prm, + opts: o, + mCtx: make(map[uint64]*iterContextCancel), + } +} diff --git a/pkg/services/reputation/eigentrust/controller/deps.go b/pkg/services/reputation/eigentrust/controller/deps.go new file mode 100644 index 000000000..394a667c8 --- /dev/null +++ b/pkg/services/reputation/eigentrust/controller/deps.go @@ -0,0 +1,35 @@ +package eigentrustctrl + +import ( + "context" +) + +// IterationContext is a context of the i-th +// stage of iterative EigenTrust algorithm. +type IterationContext interface { + context.Context + + // Must return epoch number to select the values + // for global trust calculation. + Epoch() uint64 + + // Must return the sequence number of the iteration. + I() uint32 + + // Must return true if I() is the last iteration. + Last() bool +} + +// DaughtersTrustCalculator is an interface of entity +// responsible for calculating the global trust of +// daughter nodes in terms of EigenTrust algorithm. +type DaughtersTrustCalculator interface { + // Must perform the iteration step of the loop + // for computing the global trust of all daughter + // nodes and sending intermediate values + // according to EigenTrust description + // http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Ch.5.1. + // + // Execution should be interrupted if ctx.Last(). + Calculate(ctx IterationContext) +} diff --git a/pkg/services/reputation/eigentrust/controller/opts.go b/pkg/services/reputation/eigentrust/controller/opts.go new file mode 100644 index 000000000..ef94c9469 --- /dev/null +++ b/pkg/services/reputation/eigentrust/controller/opts.go @@ -0,0 +1,30 @@ +package eigentrustctrl + +import ( + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Option sets an optional parameter of Controller. +type Option func(*options) + +type options struct { + log *logger.Logger +} + +func defaultOpts() *options { + return &options{ + log: zap.L(), + } +} + +// WithLogger returns option to specify logging component. +// +// Ignores nil values. +func WithLogger(l *logger.Logger) Option { + return func(o *options) { + if l != nil { + o.log = l + } + } +} diff --git a/pkg/services/reputation/eigentrust/iteration.go b/pkg/services/reputation/eigentrust/iteration.go new file mode 100644 index 000000000..004219963 --- /dev/null +++ b/pkg/services/reputation/eigentrust/iteration.go @@ -0,0 +1,31 @@ +package eigentrust + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" +) + +type EpochIteration struct { + e uint64 + i uint32 +} + +func (x EpochIteration) Epoch() uint64 { + return x.e +} + +func (x *EpochIteration) SetEpoch(e uint64) { + x.e = e +} + +func (x EpochIteration) I() uint32 { + return x.i +} + +func (x *EpochIteration) SetI(i uint32) { + x.i = i +} + +type IterationTrust struct { + EpochIteration + reputation.Trust +} diff --git a/pkg/services/reputation/eigentrust/storage/consumers/calls.go b/pkg/services/reputation/eigentrust/storage/consumers/calls.go new file mode 100644 index 000000000..da0b44e09 --- /dev/null +++ b/pkg/services/reputation/eigentrust/storage/consumers/calls.go @@ -0,0 +1,189 @@ +package consumerstorage + +import ( + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" +) + +// Put saves intermediate trust of the consumer to daughter peer. +func (x *Storage) Put(trust eigentrust.IterationTrust) { + var s *iterationConsumersStorage + + x.mtx.Lock() + + { + epoch := trust.Epoch() + + s = x.mItems[epoch] + if s == nil { + s = &iterationConsumersStorage{ + mItems: make(map[uint32]*ConsumersStorage, 1), + } + + x.mItems[epoch] = s + } + } + + x.mtx.Unlock() + + s.put(trust) +} + +// Consumers returns the storage of trusts of the consumers of the daugher peers +// for particular iteration of EigenTrust calculation for particular epoch. +// +// Returns false if there is no data for the epoch and iter. +func (x *Storage) Consumers(epoch uint64, iter uint32) (*ConsumersStorage, bool) { + var ( + s *iterationConsumersStorage + ok bool + ) + + x.mtx.Lock() + + { + s, ok = x.mItems[epoch] + } + + x.mtx.Unlock() + + if !ok { + return nil, false + } + + return s.consumers(iter) +} + +// maps iteration numbers of EigenTrust algorithm to repositories +// of the trusts of the consumers of the daughter peers. +type iterationConsumersStorage struct { + mtx sync.RWMutex + + mItems map[uint32]*ConsumersStorage +} + +func (x *iterationConsumersStorage) put(trust eigentrust.IterationTrust) { + var s *ConsumersStorage + + x.mtx.Lock() + + { + iter := trust.I() + + s = x.mItems[iter] + if s == nil { + s = &ConsumersStorage{ + mItems: make(map[reputation.PeerID]*ConsumersTrusts, 1), + } + + x.mItems[iter] = s + } + } + + x.mtx.Unlock() + + s.put(trust) +} + +func (x *iterationConsumersStorage) consumers(iter uint32) (s *ConsumersStorage, ok bool) { + x.mtx.Lock() + + { + s, ok = x.mItems[iter] + } + + x.mtx.Unlock() + + return +} + +// ConsumersStorage represents in-memory storage of intermediate trusts +// of the peer consumers. +// +// Maps daughter peers to repositories of the trusts of their consumers. +type ConsumersStorage struct { + mtx sync.RWMutex + + mItems map[reputation.PeerID]*ConsumersTrusts +} + +func (x *ConsumersStorage) put(trust eigentrust.IterationTrust) { + var s *ConsumersTrusts + + x.mtx.Lock() + + { + daughter := trust.Peer() + + s = x.mItems[daughter] + if s == nil { + s = &ConsumersTrusts{ + mItems: make(map[reputation.PeerID]reputation.Trust, 1), + } + + x.mItems[daughter] = s + } + } + + x.mtx.Unlock() + + s.put(trust) +} + +// Iterate passes IDs of the daughter peers with the trusts of their consumers to h. +// +// Returns errors from h directly. +func (x *ConsumersStorage) Iterate(h func(trusted reputation.PeerID, consumerTrusts *ConsumersTrusts) error) (err error) { + x.mtx.RLock() + + { + for trusted, trusts := range x.mItems { + if err = h(trusted, trusts); err != nil { + break + } + } + } + + x.mtx.RUnlock() + + return +} + +// ConsumersTrusts represents in-memory storage of the trusts +// of the consumer peers to some other peer. +type ConsumersTrusts struct { + mtx sync.RWMutex + + mItems map[reputation.PeerID]reputation.Trust +} + +func (x *ConsumersTrusts) put(trust eigentrust.IterationTrust) { + x.mtx.Lock() + + { + x.mItems[trust.TrustingPeer()] = trust.Trust + } + + x.mtx.Unlock() +} + +// Iterate passes all stored trusts to h. +// +// Returns errors from h directly. +func (x *ConsumersTrusts) Iterate(h reputation.TrustHandler) (err error) { + x.mtx.RLock() + + { + for _, trust := range x.mItems { + if err = h(trust); err != nil { + break + } + } + } + + x.mtx.RUnlock() + + return +} diff --git a/pkg/services/reputation/eigentrust/storage/consumers/storage.go b/pkg/services/reputation/eigentrust/storage/consumers/storage.go new file mode 100644 index 000000000..4be1313d0 --- /dev/null +++ b/pkg/services/reputation/eigentrust/storage/consumers/storage.go @@ -0,0 +1,40 @@ +package consumerstorage + +import ( + "sync" +) + +// Prm groups the required parameters of the Storage's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +// +// The component is not parameterizable at the moment. +type Prm struct{} + +// Storage represents in-memory storage that of the trusts +// of the consumer peers. +// +// It maps epoch numbers to the repositories of intermediate +// trusts of the consumers of the daughter peers. +// +// For correct operation, Storage must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// Storage is immediately ready to work through API. +type Storage struct { + mtx sync.RWMutex + + mItems map[uint64]*iterationConsumersStorage +} + +// New creates a new instance of the Storage. +// +// The created Storage does not require additional +// initialization and is completely ready for work. +func New(_ Prm) *Storage { + return &Storage{ + mItems: make(map[uint64]*iterationConsumersStorage), + } +} diff --git a/pkg/services/reputation/eigentrust/storage/daughters/calls.go b/pkg/services/reputation/eigentrust/storage/daughters/calls.go new file mode 100644 index 000000000..d291bcf80 --- /dev/null +++ b/pkg/services/reputation/eigentrust/storage/daughters/calls.go @@ -0,0 +1,134 @@ +package daughters + +import ( + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" +) + +// Put saves daughter peer's trust to its provider for the epoch. +func (x *Storage) Put(epoch uint64, trust reputation.Trust) { + var s *daughterStorage + + x.mtx.Lock() + + { + s = x.mItems[epoch] + if s == nil { + s = &daughterStorage{ + mItems: make(map[reputation.PeerID]*DaughterTrusts, 1), + } + + x.mItems[epoch] = s + } + } + + x.mtx.Unlock() + + s.put(trust) +} + +// DaughterTrusts returns daughter trusts for the epoch. +// +// Returns false if there is no data for the epoch and daughter. +func (x *Storage) DaughterTrusts(epoch uint64, daughter reputation.PeerID) (*DaughterTrusts, bool) { + var ( + s *daughterStorage + ok bool + ) + + x.mtx.RLock() + + { + s, ok = x.mItems[epoch] + } + + x.mtx.RUnlock() + + if !ok { + return nil, false + } + + return s.daughterTrusts(daughter) +} + +// maps IDs of daughter peers to repositories of the local trusts to their providers. +type daughterStorage struct { + mtx sync.RWMutex + + mItems map[reputation.PeerID]*DaughterTrusts +} + +func (x *daughterStorage) put(trust reputation.Trust) { + var dt *DaughterTrusts + + x.mtx.Lock() + + { + trusting := trust.TrustingPeer() + + dt = x.mItems[trusting] + if dt == nil { + dt = &DaughterTrusts{ + mItems: make(map[reputation.PeerID]reputation.Trust, 1), + } + + x.mItems[trusting] = dt + } + } + + x.mtx.Unlock() + + dt.put(trust) +} + +func (x *daughterStorage) daughterTrusts(id reputation.PeerID) (dt *DaughterTrusts, ok bool) { + x.mtx.RLock() + + { + dt, ok = x.mItems[id] + } + + x.mtx.RUnlock() + + return +} + +// DaughterTrusts represents in-memory storage of local trusts +// of the daughter peer to its providers. +// +// Maps IDs of daughter's providers to the local trusts to them. +type DaughterTrusts struct { + mtx sync.RWMutex + + mItems map[reputation.PeerID]reputation.Trust +} + +func (x *DaughterTrusts) put(trust reputation.Trust) { + x.mtx.Lock() + + { + x.mItems[trust.Peer()] = trust + } + + x.mtx.Unlock() +} + +// Iterate passes all stored trusts to h. +// +// Returns errors from h directly. +func (x *DaughterTrusts) Iterate(h reputation.TrustHandler) (err error) { + x.mtx.RLock() + + { + for _, trust := range x.mItems { + if err = h(trust); err != nil { + break + } + } + } + + x.mtx.RUnlock() + + return +} diff --git a/pkg/services/reputation/eigentrust/storage/daughters/storage.go b/pkg/services/reputation/eigentrust/storage/daughters/storage.go new file mode 100644 index 000000000..999b9d02b --- /dev/null +++ b/pkg/services/reputation/eigentrust/storage/daughters/storage.go @@ -0,0 +1,38 @@ +package daughters + +import "sync" + +// Prm groups the required parameters of the Storage's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +// +// The component is not parameterizable at the moment. +type Prm struct{} + +// Storage represents in-memory storage of local trust +// values of the daughter peers. +// +// It maps epoch numbers to the repositories of local trusts +// of the daughter peers. +// +// For correct operation, Storage must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// Storage is immediately ready to work through API. +type Storage struct { + mtx sync.RWMutex + + mItems map[uint64]*daughterStorage +} + +// New creates a new instance of the Storage. +// +// The created Storage does not require additional +// initialization and is completely ready for work. +func New(_ Prm) *Storage { + return &Storage{ + mItems: make(map[uint64]*daughterStorage), + } +} diff --git a/pkg/services/reputation/trust.go b/pkg/services/reputation/trust.go index a8e636093..2b181c1aa 100644 --- a/pkg/services/reputation/trust.go +++ b/pkg/services/reputation/trust.go @@ -7,8 +7,13 @@ import ( // TrustValue represents the numeric value of the node's trust. type TrustValue float64 -// TrustOne is a trust value equal to one. -const TrustOne = TrustValue(1) +const ( + // TrustOne is a trust value equal to one. + TrustOne = TrustValue(1) + + // TrustZero is a trust value equal to zero. + TrustZero = TrustValue(0) +) // TrustValueFromFloat64 converts float64 to TrustValue. func TrustValueFromFloat64(v float64) TrustValue { @@ -39,6 +44,11 @@ func (v TrustValue) Div(v2 TrustValue) TrustValue { return v / v2 } +// Mul multiplies v by v2. +func (v *TrustValue) Mul(v2 TrustValue) { + *v *= v2 +} + // IsZero returns true if v equal to zero. func (v TrustValue) IsZero() bool { return v == 0 @@ -46,7 +56,7 @@ func (v TrustValue) IsZero() bool { // Trust represents peer's trust (reputation). type Trust struct { - peer PeerID + trusting, peer PeerID val TrustValue } @@ -78,3 +88,13 @@ func (t Trust) Peer() PeerID { func (t *Trust) SetPeer(id PeerID) { t.peer = id } + +// TrustingPeer returns trusting peer ID. +func (t Trust) TrustingPeer() PeerID { + return t.trusting +} + +// SetTrustingPeer sets trusting peer ID. +func (t *Trust) SetTrustingPeer(id PeerID) { + t.trusting = id +}