From f6783f4f81c8d7ee95326250364561d914df6b17 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 21 Apr 2021 07:56:38 +0300 Subject: [PATCH] [#488] cmd/reputation: Add `DaughterStorage` Add `DaughterStorage` init in main pkg and start write all received daughters' trusts to it. Signed-off-by: Pavel Karpy --- cmd/neofs-node/reputation.go | 13 +++++-- .../reputation/intermediate/daughters.go | 35 +++++++++++++++++++ cmd/neofs-node/reputation/local/remote.go | 2 +- cmd/neofs-node/reputation/local/storage.go | 29 --------------- cmd/neofs-node/reputation/local/util.go | 3 +- pkg/services/reputation/common/deps.go | 2 +- .../reputation/common/router/calls.go | 4 +-- .../reputation/local/controller/calls.go | 2 +- 8 files changed, 53 insertions(+), 37 deletions(-) create mode 100644 cmd/neofs-node/reputation/intermediate/daughters.go diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 2b097ae03..7e1c3e6c3 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -6,6 +6,7 @@ import ( v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation" v2reputationgrpc "github.com/nspcc-dev/neofs-api-go/v2/reputation/grpc" crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/intermediate" localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" @@ -14,6 +15,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/reputation" reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/managers" truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" @@ -27,8 +29,15 @@ func initReputationService(c *cfg) { // consider sharing this between application components nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper) + // storing calculated trusts as a daughter c.cfgReputation.localTrustStorage = truststorage.New(truststorage.Prm{}) + // storing received trusts as a manager + daughterStorage := &intermediate.DaughterStorage{ + Log: c.log, + Storage: daughters.New(daughters.Prm{}), + } + trustStorage := &localreputation.TrustStorage{ Log: c.log, Storage: c.cfgReputation.localTrustStorage, @@ -50,7 +59,7 @@ func initReputationService(c *cfg) { remoteLocalTrustProvider := localreputation.NewRemoteTrustProvider( localreputation.RemoteProviderPrm{ LocalAddrSrc: c, - DeadEndProvider: trustStorage, + DeadEndProvider: daughterStorage, ClientCache: cache.NewSDKClientCache(), Key: c.key, }, @@ -191,5 +200,5 @@ func (s *reputationServer) processTrust(epoch uint64, t reputation.Trust, return errors.Wrap(err, "wrong route of reputation trust value") } - return w.Write(t) + return w.Write(&localreputation.EpochContext{E: epoch}, t) } diff --git a/cmd/neofs-node/reputation/intermediate/daughters.go b/cmd/neofs-node/reputation/intermediate/daughters.go new file mode 100644 index 000000000..32018ce97 --- /dev/null +++ b/cmd/neofs-node/reputation/intermediate/daughters.go @@ -0,0 +1,35 @@ +package intermediate + +import ( + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" + reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/eigentrust/storage/daughters" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" +) + +type DaughterStorage struct { + Log *logger.Logger + Storage *daughters.Storage +} + +type DaughterTrustWriter struct { + log *logger.Logger + storage *daughters.Storage +} + +func (w *DaughterTrustWriter) Write(ctx common.Context, t reputation.Trust) error { + w.storage.Put(ctx.Epoch(), t) + return nil +} + +func (w *DaughterTrustWriter) Close() error { + return nil +} + +func (s *DaughterStorage) InitWriter(_ reputationcommon.Context) (reputationcommon.Writer, error) { + return &DaughterTrustWriter{ + log: s.Log, + storage: s.Storage, + }, nil +} diff --git a/cmd/neofs-node/reputation/local/remote.go b/cmd/neofs-node/reputation/local/remote.go index c760f7952..b0281b9e1 100644 --- a/cmd/neofs-node/reputation/local/remote.go +++ b/cmd/neofs-node/reputation/local/remote.go @@ -107,7 +107,7 @@ type RemoteTrustWriter struct { buf []*reputationapi.Trust } -func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { +func (rtp *RemoteTrustWriter) Write(_ reputationcommon.Context, t reputation.Trust) error { apiTrust := reputationapi.NewTrust() apiPeer := reputationapi.NewPeerID() diff --git a/cmd/neofs-node/reputation/local/storage.go b/cmd/neofs-node/reputation/local/storage.go index c12df4d73..0875b36af 100644 --- a/cmd/neofs-node/reputation/local/storage.go +++ b/cmd/neofs-node/reputation/local/storage.go @@ -2,7 +2,6 @@ package local import ( "bytes" - "encoding/hex" netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" @@ -11,7 +10,6 @@ import ( truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "github.com/pkg/errors" - "go.uber.org/zap" ) type TrustStorage struct { @@ -37,13 +35,6 @@ func (s *TrustStorage) InitIterator(ctx reputationcommon.Context) (trustcontroll }, nil } -func (s *TrustStorage) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { - return &TrustLogger{ - ctx: ctx, - log: s.Log, - }, nil -} - type TrustIterator struct { ctx reputationcommon.Context @@ -98,23 +89,3 @@ func (it *TrustIterator) Iterate(h reputation.TrustHandler) error { return nil } - -type TrustLogger struct { - ctx reputationcommon.Context - - log *logger.Logger -} - -func (l *TrustLogger) Write(t reputation.Trust) error { - l.log.Info("received local trust", - zap.Uint64("epoch", l.ctx.Epoch()), - zap.String("peer", hex.EncodeToString(t.Peer().Bytes())), - zap.Stringer("value", t.Value()), - ) - - return nil -} - -func (*TrustLogger) Close() error { - return nil -} diff --git a/cmd/neofs-node/reputation/local/util.go b/cmd/neofs-node/reputation/local/util.go index 2f030153e..881a56b94 100644 --- a/cmd/neofs-node/reputation/local/util.go +++ b/cmd/neofs-node/reputation/local/util.go @@ -3,6 +3,7 @@ package local import ( "context" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" ) @@ -18,7 +19,7 @@ func (ctx *EpochContext) Epoch() uint64 { type NopReputationWriter struct{} -func (NopReputationWriter) Write(reputation.Trust) error { +func (NopReputationWriter) Write(common.Context, reputation.Trust) error { return nil } diff --git a/pkg/services/reputation/common/deps.go b/pkg/services/reputation/common/deps.go index 9a1bcfecc..0478995c9 100644 --- a/pkg/services/reputation/common/deps.go +++ b/pkg/services/reputation/common/deps.go @@ -29,7 +29,7 @@ type Writer interface { // Close operation. // // Write must not be called after Close. - Write(reputation.Trust) error + Write(Context, reputation.Trust) error // Close exits with method-providing Writer. // diff --git a/pkg/services/reputation/common/router/calls.go b/pkg/services/reputation/common/router/calls.go index 8d7be1931..8e3ee2847 100644 --- a/pkg/services/reputation/common/router/calls.go +++ b/pkg/services/reputation/common/router/calls.go @@ -65,7 +65,7 @@ func (r *Router) InitWriter(ctx common.Context) (common.Writer, error) { }, nil } -func (w *trustWriter) Write(t reputation.Trust) error { +func (w *trustWriter) Write(ctx common.Context, t reputation.Trust) error { w.routeMtx.Lock() defer w.routeMtx.Unlock() @@ -106,7 +106,7 @@ func (w *trustWriter) Write(t reputation.Trust) error { w.mServers[endpoint] = remoteWriter } - err := remoteWriter.Write(t) + err := remoteWriter.Write(ctx, t) if err != nil { w.router.log.Debug("could not write the value", zap.String("error", err.Error()), diff --git a/pkg/services/reputation/local/controller/calls.go b/pkg/services/reputation/local/controller/calls.go index d4fc9e102..3cb15cd9b 100644 --- a/pkg/services/reputation/local/controller/calls.go +++ b/pkg/services/reputation/local/controller/calls.go @@ -126,7 +126,7 @@ func (c *reportContext) report() { return err } - return targetWriter.Write(t) + return targetWriter.Write(c.ctx, t) }, ) if err != nil && !errors.Is(err, context.Canceled) {