diff --git a/cmd/neofs-node/reputation/common/managers.go b/cmd/neofs-node/reputation/common/managers.go index bec46897ea..e9260a4108 100644 --- a/cmd/neofs-node/reputation/common/managers.go +++ b/cmd/neofs-node/reputation/common/managers.go @@ -18,6 +18,7 @@ import ( type managerBuilder struct { log *logger.Logger nmSrc netmapcore.Source + opts *mngOptions } // ManagersPrm groups the required parameters of the managerBuilder's constructor. @@ -50,6 +51,7 @@ func NewManagerBuilder(prm ManagersPrm, opts ...MngOption) common.ManagerBuilder return &managerBuilder{ log: o.log, nmSrc: prm.NetMapSource, + opts: o, } } diff --git a/cmd/neofs-node/reputation/intermediate/calculator.go b/cmd/neofs-node/reputation/intermediate/calculator.go new file mode 100644 index 0000000000..11f69adfcd --- /dev/null +++ b/cmd/neofs-node/reputation/intermediate/calculator.go @@ -0,0 +1,27 @@ +package intermediate + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" + eigencalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator" + eigentrustctrl "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/controller" +) + +// DaughtersTrustCalculator wraps EigenTrust calculator and implements +// eigentrust/calculator's DaughtersTrustCalculator interface. +type DaughtersTrustCalculator struct { + Calculator *eigencalc.Calculator +} + +// Calculate converts and passes values to wrapped calculator. +func (c *DaughtersTrustCalculator) Calculate(ctx eigentrustctrl.IterationContext) { + calcPrm := eigencalc.CalculatePrm{} + epochIteration := eigentrust.EpochIteration{} + + epochIteration.SetEpoch(ctx.Epoch()) + epochIteration.SetI(ctx.I()) + + calcPrm.SetLast(ctx.Last()) + calcPrm.SetEpochIteration(epochIteration) + + c.Calculator.Calculate(calcPrm) +} diff --git a/cmd/neofs-node/reputation/intermediate/consumers.go b/cmd/neofs-node/reputation/intermediate/consumers.go index 9977454f3d..048147121f 100644 --- a/cmd/neofs-node/reputation/intermediate/consumers.go +++ b/cmd/neofs-node/reputation/intermediate/consumers.go @@ -2,7 +2,6 @@ package intermediate import ( "errors" - "fmt" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" @@ -36,7 +35,6 @@ func (w *ConsumerTrustWriter) Write(ctx common.Context, t reputation.Trust) erro trust.SetEpoch(eiCtx.Epoch()) trust.SetI(eiCtx.I()) - fmt.Println("decided to save consumers trusts to storage for epoch and iteration: ", eiCtx.Epoch(), eiCtx.I()) w.storage.Put(trust) return nil } diff --git a/cmd/neofs-node/reputation/intermediate/contract.go b/cmd/neofs-node/reputation/intermediate/contract.go new file mode 100644 index 0000000000..a86acddc4c --- /dev/null +++ b/cmd/neofs-node/reputation/intermediate/contract.go @@ -0,0 +1,143 @@ +package intermediate + +import ( + "crypto/ecdsa" + "fmt" + + apireputation "github.com/nspcc-dev/neofs-api-go/pkg/reputation" + "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation/wrapper" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust" + eigentrustcalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// FinalWriterProviderPrm groups the required parameters of the FinalWriterProvider's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type FinalWriterProviderPrm struct { + PrivatKey *ecdsa.PrivateKey + PubKey []byte + Client *wrapper.ClientWrapper +} + +// NewFinalWriterProvider creates a new instance of the FinalWriterProvider. +// +// Panics if at least one value of the parameters is invalid. +// +// The created FinalWriterProvider does not require additional +// initialization and is completely ready for work. +func NewFinalWriterProvider(prm FinalWriterProviderPrm, opts ...FinalWriterOption) *FinalWriterProvider { + o := defaultFinalWriterOptionsOpts() + + for i := range opts { + opts[i](o) + } + + return &FinalWriterProvider{ + prm: prm, + opts: o, + } +} + +type FinalWriterProvider struct { + prm FinalWriterProviderPrm + opts *finalWriterOptions +} + +func (fwp FinalWriterProvider) InitIntermediateWriter( + _ eigentrustcalc.Context) (eigentrustcalc.IntermediateWriter, error) { + return &FinalWriter{ + privatKey: fwp.prm.PrivatKey, + pubKey: fwp.prm.PubKey, + client: fwp.prm.Client, + l: fwp.opts.log, + }, nil + +} + +type FinalWriter struct { + privatKey *ecdsa.PrivateKey + pubKey []byte + client *wrapper.ClientWrapper + + l *logger.Logger +} + +func (fw FinalWriter) WriteIntermediateTrust(t eigentrust.IterationTrust) error { + args := wrapper.PutArgs{} + + var trustedPublicKey [33]byte + copy(trustedPublicKey[:], t.Peer().Bytes()) + + apiTrustedPeerID := apireputation.NewPeerID() + apiTrustedPeerID.SetPublicKey(trustedPublicKey) + + apiTrust := apireputation.NewTrust() + apiTrust.SetValue(t.Value().Float64()) + apiTrust.SetPeer(apiTrustedPeerID) + + var managerPublicKey [33]byte + copy(managerPublicKey[:], fw.pubKey) + + apiMangerPeerID := apireputation.NewPeerID() + apiMangerPeerID.SetPublicKey(managerPublicKey) + + gTrust := apireputation.NewGlobalTrust() + gTrust.SetTrust(apiTrust) + gTrust.SetManager(apiMangerPeerID) + + err := gTrust.Sign(fw.privatKey) + if err != nil { + fw.l.Debug( + "failed to sign global trust", + zap.Error(err), + ) + return fmt.Errorf("failed to sign global trust: %w", err) + } + + args.SetEpoch(t.Epoch()) + args.SetValue(*gTrust) + args.SetPeerID(*apiTrustedPeerID) + + err = fw.client.Put( + args, + ) + if err != nil { + fw.l.Debug( + "failed to write global trust to contract", + zap.Error(err), + ) + return fmt.Errorf("failed to write global trust to contract: %w", err) + } + + fw.l.Debug( + "sent global trust to contract", + zap.Uint64("epoch", t.Epoch()), + zap.Float64("value", t.Value().Float64()), + ) + + return nil +} + +type finalWriterOptions struct { + log *logger.Logger +} + +type FinalWriterOption func(*finalWriterOptions) + +func defaultFinalWriterOptionsOpts() *finalWriterOptions { + return &finalWriterOptions{ + log: zap.L(), + } +} + +func FinalWriterWithLogger(l *logger.Logger) FinalWriterOption { + return func(o *finalWriterOptions) { + if l != nil { + o.log = l + } + } +} diff --git a/cmd/neofs-node/reputation/intermediate/remote.go b/cmd/neofs-node/reputation/intermediate/remote.go index 6120491d0f..86a11f3178 100644 --- a/cmd/neofs-node/reputation/intermediate/remote.go +++ b/cmd/neofs-node/reputation/intermediate/remote.go @@ -2,6 +2,7 @@ package intermediate import ( "crypto/ecdsa" + apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client" reputationapi "github.com/nspcc-dev/neofs-api-go/pkg/reputation" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/common" diff --git a/cmd/neofs-node/reputation/intermediate/storage.go b/cmd/neofs-node/reputation/intermediate/storage.go index c3053e346c..bf62019ce0 100644 --- a/cmd/neofs-node/reputation/intermediate/storage.go +++ b/cmd/neofs-node/reputation/intermediate/storage.go @@ -21,8 +21,8 @@ func (i InitialTrustSource) InitialTrust(reputation.PeerID) (reputation.TrustVal // DaughterTrustIteratorProvider is implementation of the // reputation/eigentrust/calculator's DaughterTrustIteratorProvider interface. type DaughterTrustIteratorProvider struct { - daughterStorage *daughters.Storage - consumerStorage *consumerstorage.Storage + DaughterStorage *daughters.Storage + ConsumerStorage *consumerstorage.Storage } type ErrNoData struct { @@ -46,7 +46,7 @@ func (e *ErrNoData) Error() string { // specified epoch and daughter's PeerId. func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc.Context, p reputation.PeerID) (eigentrustcalc.TrustIterator, error) { - daughterIterator, ok := ip.daughterStorage.DaughterTrusts(ctx.Epoch(), p) + daughterIterator, ok := ip.DaughterStorage.DaughterTrusts(ctx.Epoch(), p) if !ok { return nil, &ErrNoData{ daughter: p, @@ -66,7 +66,7 @@ func (ip *DaughterTrustIteratorProvider) InitDaughterIterator(ctx eigentrustcalc // specified epoch. func (ip *DaughterTrustIteratorProvider) InitAllDaughtersIterator( ctx eigentrustcalc.Context) (eigentrustcalc.PeerTrustsIterator, error) { - iter, ok := ip.daughterStorage.AllDaughterTrusts(ctx.Epoch()) + iter, ok := ip.DaughterStorage.AllDaughterTrusts(ctx.Epoch()) if !ok { return nil, &ErrNoData{epoch: ctx.Epoch()} } @@ -83,7 +83,7 @@ func (ip *DaughterTrustIteratorProvider) InitAllDaughtersIterator( // specified epoch and iteration. func (ip *DaughterTrustIteratorProvider) InitConsumersIterator( ctx eigentrustcalc.Context) (eigentrustcalc.PeerTrustsIterator, error) { - consumerIterator, ok := ip.consumerStorage.Consumers(ctx.Epoch(), ctx.I()) + consumerIterator, ok := ip.ConsumerStorage.Consumers(ctx.Epoch(), ctx.I()) if !ok { return nil, &ErrNoData{epoch: ctx.Epoch()} } diff --git a/go.sum b/go.sum index 1c423297b4..b0903f62f7 100644 Binary files a/go.sum and b/go.sum differ diff --git a/pkg/services/reputation/eigentrust/calculator/calculator.go b/pkg/services/reputation/eigentrust/calculator/calculator.go index b946763617..883cecf8ca 100644 --- a/pkg/services/reputation/eigentrust/calculator/calculator.go +++ b/pkg/services/reputation/eigentrust/calculator/calculator.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" "github.com/nspcc-dev/neofs-node/pkg/util" ) @@ -26,7 +27,7 @@ type Prm struct { DaughterTrustSource DaughterTrustIteratorProvider - IntermediateValueTarget IntermediateWriterProvider + IntermediateValueTarget common.WriterProvider FinalResultTarget IntermediateWriterProvider diff --git a/pkg/services/reputation/eigentrust/calculator/calls.go b/pkg/services/reputation/eigentrust/calculator/calls.go index c10a73a87e..a4f24ce421 100644 --- a/pkg/services/reputation/eigentrust/calculator/calls.go +++ b/pkg/services/reputation/eigentrust/calculator/calls.go @@ -15,17 +15,16 @@ type CalculatePrm struct { ei eigentrust.EpochIteration } +func (p *CalculatePrm) SetLast(last bool) { + p.last = last +} + func (p *CalculatePrm) SetEpochIteration(ei eigentrust.EpochIteration) { p.ei = ei } -type iterContext struct { - context.Context - eigentrust.EpochIteration -} - func (c *Calculator) Calculate(prm CalculatePrm) { - ctx := iterContext{ + ctx := eigentrust.IterContext{ Context: context.Background(), EpochIteration: prm.ei, } @@ -139,7 +138,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { var intermediateTrust eigentrust.IterationTrust intermediateTrust.SetEpoch(p.ctx.Epoch()) - intermediateTrust.SetTrustingPeer(p.id) + intermediateTrust.SetPeer(p.id) intermediateTrust.SetI(p.ctx.I()) if p.lastIter { @@ -163,7 +162,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { return } } else { - intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(p.ctx) + intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(p.ctx) if err != nil { c.opts.log.Debug("init intermediate writer failure", zap.String("error", err.Error()), @@ -184,10 +183,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { trust.SetValue(val) - intermediateTrust.SetPeer(trust.Peer()) - intermediateTrust.SetValue(val) - - err := intermediateWriter.WriteIntermediateTrust(intermediateTrust) + err := intermediateWriter.Write(p.ctx, trust) if err != nil { c.opts.log.Debug("write intermediate value failure", zap.String("error", err.Error()), @@ -201,6 +197,14 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) { zap.String("error", err.Error()), ) } + + err = intermediateWriter.Close() + if err != nil { + c.opts.log.Error( + "could not close intermediate writer", + zap.String("error", err.Error()), + ) + } } } @@ -214,7 +218,7 @@ func (c *Calculator) sendInitialValues(ctx Context) { return } - intermediateWriter, err := c.prm.IntermediateValueTarget.InitIntermediateWriter(ctx) + intermediateWriter, err := c.prm.IntermediateValueTarget.InitWriter(ctx) if err != nil { c.opts.log.Debug("init intermediate writer failure", zap.String("error", err.Error()), @@ -223,14 +227,7 @@ func (c *Calculator) sendInitialValues(ctx Context) { return } - var intermediateTrust eigentrust.IterationTrust - - intermediateTrust.SetEpoch(ctx.Epoch()) - intermediateTrust.SetI(ctx.I()) - err = daughterIter.Iterate(func(daughter reputation.PeerID, iterator TrustIterator) error { - intermediateTrust.SetTrustingPeer(daughter) - return iterator.Iterate(func(trust reputation.Trust) error { trusted := trust.Peer() @@ -245,12 +242,10 @@ func (c *Calculator) sendInitialValues(ctx Context) { return nil } - intermediateTrust.SetPeer(trusted) - initTrust.Mul(trust.Value()) - intermediateTrust.SetValue(initTrust) + trust.SetValue(initTrust) - err = intermediateWriter.WriteIntermediateTrust(intermediateTrust) + err = intermediateWriter.Write(ctx, trust) if err != nil { c.opts.log.Debug("write intermediate value failure", zap.String("error", err.Error()), @@ -267,4 +262,11 @@ func (c *Calculator) sendInitialValues(ctx Context) { zap.String("error", err.Error()), ) } + + err = intermediateWriter.Close() + if err != nil { + c.opts.log.Debug("could not close intermediate writer", + zap.String("error", err.Error()), + ) + } } diff --git a/pkg/services/reputation/eigentrust/calculator/deps.go b/pkg/services/reputation/eigentrust/calculator/deps.go index b52151a6a0..f94de3d490 100644 --- a/pkg/services/reputation/eigentrust/calculator/deps.go +++ b/pkg/services/reputation/eigentrust/calculator/deps.go @@ -28,13 +28,27 @@ type TrustIterator interface { type PeerTrustsHandler func(reputation.PeerID, TrustIterator) error +// PeerTrustsIterator must iterate over all nodes(PeerIDs) and provide +// TrustIterator for iteration over node's Trusts to others peers. type PeerTrustsIterator interface { Iterate(PeerTrustsHandler) error } type DaughterTrustIteratorProvider interface { - InitDaughterIterator(Context, reputation.PeerID) (TrustIterator, error) - InitAllDaughtersIterator(Context) (PeerTrustsIterator, error) + // InitDaughterIterator must init TrustIterator + // that iterates over received local trusts from + // daughter p for ctx.Epoch() epoch. + InitDaughterIterator(ctx Context, p reputation.PeerID) (TrustIterator, error) + // InitAllDaughtersIterator must init PeerTrustsIterator + // that must iterate over all daughters of the current + // node(manager) and all trusts received from them for + // ctx.Epoch() epoch. + InitAllDaughtersIterator(ctx Context) (PeerTrustsIterator, error) + // InitConsumersIterator must init PeerTrustsIterator + // that must iterate over all daughters of the current + // node(manager) and their consumers' trusts received + // from other managers for ctx.Epoch() epoch and + // ctx.I() iteration. InitConsumersIterator(Context) (PeerTrustsIterator, error) } diff --git a/pkg/services/reputation/eigentrust/controller/calls.go b/pkg/services/reputation/eigentrust/controller/calls.go index fbef675f59..27657ebf5c 100644 --- a/pkg/services/reputation/eigentrust/controller/calls.go +++ b/pkg/services/reputation/eigentrust/controller/calls.go @@ -9,7 +9,7 @@ import ( // ContinuePrm groups the required parameters of Continue operation. type ContinuePrm struct { - epoch uint64 + Epoch uint64 } type iterContext struct { @@ -35,20 +35,24 @@ func (c *Controller) Continue(prm ContinuePrm) { c.mtx.Lock() { - iterCtx, ok := c.mCtx[prm.epoch] + iterCtx, ok := c.mCtx[prm.Epoch] if !ok { - iterCtx := new(iterContextCancel) - c.mCtx[prm.epoch] = iterCtx + iterCtx = new(iterContextCancel) + c.mCtx[prm.Epoch] = iterCtx iterCtx.Context, iterCtx.cancel = context.WithCancel(context.Background()) + iterCtx.EpochIteration.SetEpoch(prm.Epoch) } else { iterCtx.cancel() } - iterCtx.last = iterCtx.I() == c.prm.IterationNumber + iterCtx.last = iterCtx.I() == c.iterationNumber-1 err := c.prm.WorkerPool.Submit(func() { c.prm.DaughtersTrustCalculator.Calculate(iterCtx.iterContext) + + // iteration++ + iterCtx.Increment() }) if err != nil { c.opts.log.Debug("iteration submit failure", @@ -57,11 +61,21 @@ func (c *Controller) Continue(prm ContinuePrm) { } if iterCtx.last { - delete(c.mCtx, prm.epoch) - // In this case and worker pool failure we can mark epoch - // number as already processed, but in any case it grows up - // during normal operation of the system. Also, such information // will only live while the application is alive. + // during normal operation of the system. Also, such information + // number as already processed, but in any case it grows up + // In this case and worker pool failure we can mark epoch + delete(c.mCtx, prm.Epoch) + + iterations, err := c.prm.IterationsProvider.EigenTrustIterations() + if err != nil { + c.opts.log.Debug( + "could not get iteration numbers", + zap.String("error", err.Error()), + ) + } else { + c.iterationNumber = uint32(iterations) + } } } diff --git a/pkg/services/reputation/eigentrust/controller/controller.go b/pkg/services/reputation/eigentrust/controller/controller.go index 9313af5868..8c1f2d2b40 100644 --- a/pkg/services/reputation/eigentrust/controller/controller.go +++ b/pkg/services/reputation/eigentrust/controller/controller.go @@ -13,14 +13,15 @@ import ( // Passing incorrect parameter values will result in constructor // failure (error or panic depending on the implementation). type Prm struct { - // Number of iterations - IterationNumber uint32 - // Component of computing iteration of EigenTrust algorithm. // // Must not be nil. DaughtersTrustCalculator DaughtersTrustCalculator + // IterationsProvider provides information about numbers + // of iterations for algorithm. + IterationsProvider IterationsProvider + // Routine execution pool for single epoch iteration. WorkerPool util.WorkerPool } @@ -45,6 +46,9 @@ type Controller struct { opts *options + // Number of iterations + iterationNumber uint32 + mtx sync.Mutex mCtx map[uint64]*iterContextCancel } @@ -63,14 +67,19 @@ func panicOnPrmValue(n string, v interface{}) { // initialization and is completely ready for work. func New(prm Prm, opts ...Option) *Controller { switch { - case prm.IterationNumber == 0: - panicOnPrmValue("IterationNumber", prm.IterationNumber) + case prm.IterationsProvider == nil: + panicOnPrmValue("IterationNumber", prm.IterationsProvider) case prm.WorkerPool == nil: panicOnPrmValue("WorkerPool", prm.WorkerPool) case prm.DaughtersTrustCalculator == nil: panicOnPrmValue("DaughtersTrustCalculator", prm.DaughtersTrustCalculator) } + iterations, err := prm.IterationsProvider.EigenTrustIterations() + if err != nil { + panic(fmt.Errorf("could not init EigenTrust controller: could not get num of iterations: %w", err)) + } + o := defaultOpts() for _, opt := range opts { @@ -78,8 +87,9 @@ func New(prm Prm, opts ...Option) *Controller { } return &Controller{ - prm: prm, - opts: o, - mCtx: make(map[uint64]*iterContextCancel), + iterationNumber: uint32(iterations), + prm: prm, + opts: o, + mCtx: make(map[uint64]*iterContextCancel), } } diff --git a/pkg/services/reputation/eigentrust/controller/deps.go b/pkg/services/reputation/eigentrust/controller/deps.go index 394a667c84..ade2f6a2da 100644 --- a/pkg/services/reputation/eigentrust/controller/deps.go +++ b/pkg/services/reputation/eigentrust/controller/deps.go @@ -33,3 +33,9 @@ type DaughtersTrustCalculator interface { // Execution should be interrupted if ctx.Last(). Calculate(ctx IterationContext) } + +// IterationsProvider must provides information about numbers +// of iterations for algorithm. +type IterationsProvider interface { + EigenTrustIterations() (uint64, error) +} diff --git a/pkg/services/reputation/eigentrust/iteration.go b/pkg/services/reputation/eigentrust/iteration.go index 0042199630..959c2c23aa 100644 --- a/pkg/services/reputation/eigentrust/iteration.go +++ b/pkg/services/reputation/eigentrust/iteration.go @@ -1,6 +1,8 @@ package eigentrust import ( + "context" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" ) @@ -25,7 +27,30 @@ func (x *EpochIteration) SetI(i uint32) { x.i = i } +func (x *EpochIteration) Increment() { + x.i++ +} + type IterationTrust struct { EpochIteration reputation.Trust } + +// IterContext aggregates context and data required for +// iterations. +type IterContext struct { + context.Context + EpochIteration +} + +func NewIterContext(ctx context.Context, epoch uint64, iter uint32) *IterContext { + ei := EpochIteration{} + + ei.SetI(iter) + ei.SetEpoch(epoch) + + return &IterContext{ + Context: ctx, + EpochIteration: ei, + } +}