forked from TrueCloudLab/frostfs-node
[#488] reputation: Change Writer
interface
Includes: - Delete first `ctx` argument in `Write` method. - Move intermediate Initial trust struct and method to `calculator` file. - Change Alpha to 0.1. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
0d34d7c508
commit
d1db54acf8
14 changed files with 61 additions and 63 deletions
|
@ -37,7 +37,8 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const EigenTrustAlpha = 0.5
|
||||
const EigenTrustAlpha = 0.1
|
||||
const EigenTrustInitialTrust = 0.5
|
||||
|
||||
func initReputationService(c *cfg) {
|
||||
staticClient, err := client.NewStatic(
|
||||
|
@ -60,9 +61,7 @@ func initReputationService(c *cfg) {
|
|||
|
||||
// storing calculated trusts as a daughter
|
||||
c.cfgReputation.localTrustStorage = truststorage.New(
|
||||
truststorage.Prm{
|
||||
LocalServer: c,
|
||||
},
|
||||
truststorage.Prm{},
|
||||
)
|
||||
|
||||
daughterStorage := daughters.New(daughters.Prm{})
|
||||
|
@ -154,7 +153,9 @@ func initReputationService(c *cfg) {
|
|||
AlphaProvider: intermediate.AlphaProvider{
|
||||
Alpha: EigenTrustAlpha,
|
||||
},
|
||||
InitialTrustSource: intermediatereputation.InitialTrustSource{},
|
||||
InitialTrustSource: intermediatereputation.InitialTrustSource{
|
||||
Trust: reputation.TrustValueFromFloat64(EigenTrustInitialTrust),
|
||||
},
|
||||
IntermediateValueTarget: intermediateTrustRouter,
|
||||
WorkerPool: c.cfgReputation.workerPool,
|
||||
FinalResultTarget: intermediate.NewFinalWriterProvider(
|
||||
|
@ -298,12 +299,9 @@ func (s *reputationServer) SendIntermediateResult(ctx context.Context, req *v2re
|
|||
|
||||
body := req.GetBody()
|
||||
|
||||
eCtx := &common.EpochContext{
|
||||
Context: ctx,
|
||||
E: body.GetEpoch(),
|
||||
}
|
||||
eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration())
|
||||
|
||||
w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute))
|
||||
w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eiCtx, passedRoute))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not initialize intermediate trust writer")
|
||||
}
|
||||
|
@ -312,9 +310,7 @@ func (s *reputationServer) SendIntermediateResult(ctx context.Context, req *v2re
|
|||
|
||||
trust := apiToLocalTrust(v2Trust.GetTrust(), v2Trust.GetTrustingPeer().GetValue())
|
||||
|
||||
eiCtx := eigentrust.NewIterContext(ctx, body.GetEpoch(), body.GetIteration())
|
||||
|
||||
err = w.Write(eiCtx, trust)
|
||||
err = w.Write(trust)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not write intermediate trust")
|
||||
}
|
||||
|
@ -332,7 +328,7 @@ func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust,
|
|||
return errors.Wrap(err, "wrong route of reputation trust value")
|
||||
}
|
||||
|
||||
return w.Write(&common.EpochContext{E: epoch}, t)
|
||||
return w.Write(t)
|
||||
}
|
||||
|
||||
// apiToLocalTrust converts v2 Trust to local reputation.Trust, adding trustingPeer.
|
||||
|
|
|
@ -3,7 +3,6 @@ package common
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
)
|
||||
|
@ -20,7 +19,7 @@ func (ctx *EpochContext) Epoch() uint64 {
|
|||
|
||||
type NopReputationWriter struct{}
|
||||
|
||||
func (NopReputationWriter) Write(common.Context, reputation.Trust) error {
|
||||
func (NopReputationWriter) Write(reputation.Trust) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,23 @@
|
|||
package intermediate
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust"
|
||||
eigencalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator"
|
||||
eigentrustctrl "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/controller"
|
||||
)
|
||||
|
||||
// InitialTrustSource is implementation of the
|
||||
// reputation/eigentrust/calculator's InitialTrustSource interface.
|
||||
type InitialTrustSource struct {
|
||||
Trust reputation.TrustValue
|
||||
}
|
||||
|
||||
// InitialTrust returns `initialTrust` as initial trust value.
|
||||
func (i InitialTrustSource) InitialTrust(reputation.PeerID) (reputation.TrustValue, error) {
|
||||
return i.Trust, nil
|
||||
}
|
||||
|
||||
// DaughtersTrustCalculator wraps EigenTrust calculator and implements
|
||||
// eigentrust/calculator's DaughtersTrustCalculator interface.
|
||||
type DaughtersTrustCalculator struct {
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
package intermediate
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust"
|
||||
eigencalc "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/calculator"
|
||||
|
@ -12,7 +9,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
)
|
||||
|
||||
var ErrIncorrectContext = errors.New("could not write intermediate trust: passed context incorrect")
|
||||
var ErrIncorrectContextPanicMsg = "could not write intermediate trust: passed context incorrect"
|
||||
|
||||
// ConsumerStorageWriterProvider is implementation of reputation.WriterProvider
|
||||
// interface that provides ConsumerTrustWriter writer.
|
||||
|
@ -27,18 +24,14 @@ type ConsumerStorageWriterProvider struct {
|
|||
type ConsumerTrustWriter struct {
|
||||
log *logger.Logger
|
||||
storage *consumerstorage.Storage
|
||||
eiCtx eigencalc.Context
|
||||
}
|
||||
|
||||
func (w *ConsumerTrustWriter) Write(ctx common.Context, t reputation.Trust) error {
|
||||
eiCtx, ok := ctx.(eigencalc.Context)
|
||||
if !ok {
|
||||
return ErrIncorrectContext
|
||||
}
|
||||
|
||||
func (w *ConsumerTrustWriter) Write(t reputation.Trust) error {
|
||||
trust := eigentrust.IterationTrust{Trust: t}
|
||||
|
||||
trust.SetEpoch(eiCtx.Epoch())
|
||||
trust.SetI(eiCtx.I())
|
||||
trust.SetEpoch(w.eiCtx.Epoch())
|
||||
trust.SetI(w.eiCtx.I())
|
||||
|
||||
w.storage.Put(trust)
|
||||
return nil
|
||||
|
@ -48,9 +41,15 @@ func (w *ConsumerTrustWriter) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *ConsumerStorageWriterProvider) InitWriter(_ reputationcommon.Context) (reputationcommon.Writer, error) {
|
||||
func (s *ConsumerStorageWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) {
|
||||
eiCtx, ok := ctx.(eigencalc.Context)
|
||||
if !ok {
|
||||
panic(ErrIncorrectContextPanicMsg)
|
||||
}
|
||||
|
||||
return &ConsumerTrustWriter{
|
||||
log: s.Log,
|
||||
storage: s.Storage,
|
||||
eiCtx: eiCtx,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package intermediate
|
|||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -21,10 +20,11 @@ type DaughterStorageWriterProvider struct {
|
|||
type DaughterTrustWriter struct {
|
||||
log *logger.Logger
|
||||
storage *daughters.Storage
|
||||
ctx reputationcommon.Context
|
||||
}
|
||||
|
||||
func (w *DaughterTrustWriter) Write(ctx common.Context, t reputation.Trust) error {
|
||||
w.storage.Put(ctx.Epoch(), t)
|
||||
func (w *DaughterTrustWriter) Write(t reputation.Trust) error {
|
||||
w.storage.Put(w.ctx.Epoch(), t)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -32,9 +32,10 @@ func (w *DaughterTrustWriter) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *DaughterStorageWriterProvider) InitWriter(_ reputationcommon.Context) (reputationcommon.Writer, error) {
|
||||
func (s *DaughterStorageWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) {
|
||||
return &DaughterTrustWriter{
|
||||
log: s.Log,
|
||||
storage: s.Storage,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -55,15 +55,20 @@ type TrustWriterProvider struct {
|
|||
}
|
||||
|
||||
func (twp *TrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) {
|
||||
eiContext, ok := ctx.(eigentrustcalc.Context)
|
||||
if !ok {
|
||||
panic(ErrIncorrectContextPanicMsg)
|
||||
}
|
||||
|
||||
return &RemoteTrustWriter{
|
||||
ctx: ctx,
|
||||
eiCtx: eiContext,
|
||||
client: twp.client,
|
||||
key: twp.key,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type RemoteTrustWriter struct {
|
||||
ctx reputationcommon.Context
|
||||
eiCtx eigentrustcalc.Context
|
||||
client apiClient.Client
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
|
@ -73,12 +78,7 @@ type RemoteTrustWriter struct {
|
|||
// Write check if passed context contains required
|
||||
// data(returns ErrIncorrectContext if not) and
|
||||
// caches passed trusts(as SendIntermediateTrustPrm structs).
|
||||
func (rtp *RemoteTrustWriter) Write(ctx reputationcommon.Context, t reputation.Trust) error {
|
||||
eiContext, ok := ctx.(eigentrustcalc.Context)
|
||||
if !ok {
|
||||
return ErrIncorrectContext
|
||||
}
|
||||
|
||||
func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error {
|
||||
apiTrustingPeer := reputationapi.NewPeerID()
|
||||
apiTrustingPeer.SetPublicKey(t.TrustingPeer())
|
||||
|
||||
|
@ -94,8 +94,8 @@ func (rtp *RemoteTrustWriter) Write(ctx reputationcommon.Context, t reputation.T
|
|||
apiPeerToPeerTrust.SetTrust(apiTrust)
|
||||
|
||||
p := &apiClient.SendIntermediateTrustPrm{}
|
||||
p.SetEpoch(eiContext.Epoch())
|
||||
p.SetIteration(eiContext.I())
|
||||
p.SetEpoch(rtp.eiCtx.Epoch())
|
||||
p.SetIteration(rtp.eiCtx.I())
|
||||
p.SetTrust(apiPeerToPeerTrust)
|
||||
|
||||
rtp.buf = append(rtp.buf, p)
|
||||
|
@ -108,7 +108,7 @@ func (rtp *RemoteTrustWriter) Write(ctx reputationcommon.Context, t reputation.T
|
|||
func (rtp *RemoteTrustWriter) Close() (err error) {
|
||||
for _, prm := range rtp.buf {
|
||||
_, err = rtp.client.SendIntermediateTrust(
|
||||
rtp.ctx,
|
||||
rtp.eiCtx,
|
||||
*prm,
|
||||
apiClient.WithKey(rtp.key),
|
||||
)
|
||||
|
|
|
@ -9,15 +9,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters"
|
||||
)
|
||||
|
||||
// InitialTrustSource is implementation of the
|
||||
// reputation/eigentrust/calculator's InitialTrustSource interface.
|
||||
type InitialTrustSource struct{}
|
||||
|
||||
// InitialTrust returns `1` as initial trust value.
|
||||
func (i InitialTrustSource) InitialTrust(reputation.PeerID) (reputation.TrustValue, error) {
|
||||
return reputation.TrustOne, nil
|
||||
}
|
||||
|
||||
// DaughterTrustIteratorProvider is implementation of the
|
||||
// reputation/eigentrust/calculator's DaughterTrustIteratorProvider interface.
|
||||
type DaughterTrustIteratorProvider struct {
|
||||
|
|
|
@ -69,7 +69,7 @@ type RemoteTrustWriter struct {
|
|||
buf []*reputationapi.Trust
|
||||
}
|
||||
|
||||
func (rtp *RemoteTrustWriter) Write(_ reputationcommon.Context, t reputation.Trust) error {
|
||||
func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error {
|
||||
apiTrust := reputationapi.NewTrust()
|
||||
|
||||
apiPeer := reputationapi.NewPeerID()
|
||||
|
|
|
@ -29,7 +29,7 @@ type Writer interface {
|
|||
// Close operation.
|
||||
//
|
||||
// Write must not be called after Close.
|
||||
Write(Context, reputation.Trust) error
|
||||
Write(reputation.Trust) error
|
||||
|
||||
// Close exits with method-providing Writer.
|
||||
//
|
||||
|
|
|
@ -65,7 +65,7 @@ func (r *Router) InitWriter(ctx common.Context) (common.Writer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (w *trustWriter) Write(ctx common.Context, t reputation.Trust) error {
|
||||
func (w *trustWriter) Write(t reputation.Trust) error {
|
||||
w.routeMtx.Lock()
|
||||
defer w.routeMtx.Unlock()
|
||||
|
||||
|
@ -94,7 +94,7 @@ func (w *trustWriter) Write(ctx common.Context, t reputation.Trust) error {
|
|||
continue
|
||||
}
|
||||
|
||||
remoteWriter, err = provider.InitWriter(w.ctx)
|
||||
remoteWriter, err = provider.InitWriter(w.ctx.Context)
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not initialize writer",
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -106,7 +106,7 @@ func (w *trustWriter) Write(ctx common.Context, t reputation.Trust) error {
|
|||
w.mServers[endpoint] = remoteWriter
|
||||
}
|
||||
|
||||
err := remoteWriter.Write(ctx, t)
|
||||
err := remoteWriter.Write(t)
|
||||
if err != nil {
|
||||
w.router.log.Debug("could not write the value",
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -195,7 +195,7 @@ func (c *Calculator) iterateDaughter(p iterDaughterPrm) {
|
|||
|
||||
trust.SetValue(val)
|
||||
|
||||
err := intermediateWriter.Write(p.ctx, trust)
|
||||
err := intermediateWriter.Write(trust)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("write intermediate value failure",
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -257,7 +257,7 @@ func (c *Calculator) sendInitialValues(ctx Context) {
|
|||
initTrust.Mul(trust.Value())
|
||||
trust.SetValue(initTrust)
|
||||
|
||||
err = intermediateWriter.Write(ctx, trust)
|
||||
err = intermediateWriter.Write(trust)
|
||||
if err != nil {
|
||||
c.opts.log.Debug("write intermediate value failure",
|
||||
zap.String("error", err.Error()),
|
||||
|
|
|
@ -126,7 +126,7 @@ func (c *reportContext) report() {
|
|||
return err
|
||||
}
|
||||
|
||||
return targetWriter.Write(c.ctx, t)
|
||||
return targetWriter.Write(t)
|
||||
},
|
||||
)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
// All values must comply with the requirements imposed on them.
|
||||
// Passing incorrect parameter values will result in constructor
|
||||
// failure (error or panic depending on the implementation).
|
||||
type Prm struct {}
|
||||
type Prm struct{}
|
||||
|
||||
// Storage represents in-memory storage of
|
||||
// local reputation values.
|
||||
|
|
Loading…
Reference in a new issue