From ff7a8ae67756159363f7171c736bddb5b3ec2b7b Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Sun, 18 Apr 2021 11:51:49 +0300 Subject: [PATCH] [#484] cmd/reputation: split it into subpackages Split cmd/reputation into subpackages for future refactoring. Signed-off-by: Pavel Karpy --- cmd/neofs-node/reputation.go | 311 ++------------------ cmd/neofs-node/reputation/local/managers.go | 106 +++++++ cmd/neofs-node/reputation/local/remote.go | 137 +++++++++ cmd/neofs-node/reputation/local/storage.go | 120 ++++++++ cmd/neofs-node/reputation/local/util.go | 45 +++ 5 files changed, 432 insertions(+), 287 deletions(-) create mode 100644 cmd/neofs-node/reputation/local/managers.go create mode 100644 cmd/neofs-node/reputation/local/remote.go create mode 100644 cmd/neofs-node/reputation/local/storage.go create mode 100644 cmd/neofs-node/reputation/local/util.go diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 78192ae21..2b097ae03 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -1,22 +1,14 @@ package main import ( - "bytes" "context" - "crypto/ecdsa" - "encoding/hex" - "github.com/nspcc-dev/hrw" - apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client" - apiNetmap "github.com/nspcc-dev/neofs-api-go/pkg/netmap" - reputationapi "github.com/nspcc-dev/neofs-api-go/pkg/reputation" v2reputation "github.com/nspcc-dev/neofs-api-go/v2/reputation" v2reputationgrpc "github.com/nspcc-dev/neofs-api-go/v2/reputation/grpc" crypto "github.com/nspcc-dev/neofs-crypto" - netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + localreputation "github.com/nspcc-dev/neofs-node/cmd/neofs-node/reputation/local" "github.com/nspcc-dev/neofs-node/pkg/morph/event" "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" grpcreputation "github.com/nspcc-dev/neofs-node/pkg/network/transport/reputation/grpc" "github.com/nspcc-dev/neofs-node/pkg/services/reputation" @@ -31,272 +23,38 @@ import ( "go.uber.org/zap" ) -type localTrustStorage struct { - log *logger.Logger - - storage *truststorage.Storage - - nmSrc netmapcore.Source - - localKey []byte -} - -type localTrustIterator struct { - ctx reputationcommon.Context - - storage *localTrustStorage - - epochStorage *truststorage.EpochTrustValueStorage -} - -type managerBuilder struct { - log *logger.Logger - nmSrc netmapcore.Source -} - -type remoteLocalTrustProvider struct { - localAddrSrc network.LocalAddressSource - deadEndProvider reputationcommon.WriterProvider - key *ecdsa.PrivateKey - - clientCache interface { - Get(string) (apiClient.Client, error) - } -} - -type nopReputationWriter struct{} - -func (nopReputationWriter) Write(reputation.Trust) error { - return nil -} - -func (nopReputationWriter) Close() error { - return nil -} - -type remoteLocalTrustWriter struct { - ctx reputationcommon.Context - client apiClient.Client - key *ecdsa.PrivateKey - - buf []*reputationapi.Trust -} - -func (rtp *remoteLocalTrustWriter) Write(t reputation.Trust) error { - apiTrust := reputationapi.NewTrust() - - apiPeer := reputationapi.NewPeerID() - apiPeer.SetPublicKey(t.Peer()) - - apiTrust.SetValue(t.Value().Float64()) - apiTrust.SetPeer(apiPeer) - - rtp.buf = append(rtp.buf, apiTrust) - - return nil -} - -func (rtp *remoteLocalTrustWriter) Close() error { - prm := apiClient.SendLocalTrustPrm{} - - prm.SetEpoch(rtp.ctx.Epoch()) - prm.SetTrusts(rtp.buf) - - _, err := rtp.client.SendLocalTrust( - rtp.ctx, - prm, - apiClient.WithKey(rtp.key), - ) - - return err -} - -type remoteLocalTrustWriterProvider struct { - client apiClient.Client - key *ecdsa.PrivateKey -} - -type localTrustLogger struct { - ctx reputationcommon.Context - - log *logger.Logger -} - -func (l *localTrustLogger) Write(t reputation.Trust) error { - l.log.Info("received local trust", - zap.Uint64("epoch", l.ctx.Epoch()), - zap.String("peer", hex.EncodeToString(t.Peer().Bytes())), - zap.Stringer("value", t.Value()), - ) - - return nil -} - -func (*localTrustLogger) Close() error { - return nil -} - -func (rtwp *remoteLocalTrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { - return &remoteLocalTrustWriter{ - ctx: ctx, - client: rtwp.client, - key: rtwp.key, - }, nil -} - -func (rtp *remoteLocalTrustProvider) InitRemote(srv reputationrouter.ServerInfo) (reputationcommon.WriterProvider, error) { - if srv == nil { - return rtp.deadEndProvider, nil - } - - addr := srv.Address() - - if rtp.localAddrSrc.LocalAddress().String() == srv.Address() { - // if local => return no-op writer - return trustcontroller.SimpleWriterProvider(new(nopReputationWriter)), nil - } - - ipAddr, err := network.IPAddrFromMultiaddr(addr) - if err != nil { - return nil, errors.Wrap(err, "could not convert address to IP format") - } - - c, err := rtp.clientCache.Get(ipAddr) - if err != nil { - return nil, errors.Wrap(err, "could not initialize API client") - } - - return &remoteLocalTrustWriterProvider{ - client: c, - key: rtp.key, - }, nil -} - -// BuildManagers sorts nodes in NetMap with HRW algorithms and -// takes the next node after the current one as the only manager. -func (mb *managerBuilder) BuildManagers(epoch uint64, p reputation.PeerID) ([]reputationrouter.ServerInfo, error) { - nm, err := mb.nmSrc.GetNetMapByEpoch(epoch) - if err != nil { - return nil, err - } - - // make a copy to keep order consistency of the origin netmap after sorting - nodes := make([]*apiNetmap.Node, len(nm.Nodes)) - - copy(nodes, nm.Nodes) - - hrw.SortSliceByValue(nodes, epoch) - - for i := range nodes { - if bytes.Equal(nodes[i].PublicKey(), p.Bytes()) { - managerIndex := i + 1 - - if managerIndex == len(nodes) { - managerIndex = 0 - } - - return []reputationrouter.ServerInfo{nodes[managerIndex]}, nil - } - } - - return nil, nil -} - -func (s *localTrustStorage) InitIterator(ctx reputationcommon.Context) (trustcontroller.Iterator, error) { - epochStorage, err := s.storage.DataForEpoch(ctx.Epoch()) - if err != nil && !errors.Is(err, truststorage.ErrNoPositiveTrust) { - return nil, err - } - - return &localTrustIterator{ - ctx: ctx, - storage: s, - epochStorage: epochStorage, - }, nil -} - -func (s *localTrustStorage) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { - return &localTrustLogger{ - ctx: ctx, - log: s.log, - }, nil -} - -func (it *localTrustIterator) Iterate(h reputation.TrustHandler) error { - if it.epochStorage != nil { - err := it.epochStorage.Iterate(h) - if !errors.Is(err, truststorage.ErrNoPositiveTrust) { - return err - } - } - - nm, err := it.storage.nmSrc.GetNetMapByEpoch(it.ctx.Epoch()) - if err != nil { - return err - } - - // find out if local node is presented in netmap - localIndex := -1 - - for i := range nm.Nodes { - if bytes.Equal(nm.Nodes[i].PublicKey(), it.storage.localKey) { - localIndex = i - } - } - - ln := len(nm.Nodes) - if localIndex >= 0 && ln > 0 { - ln-- - } - - // calculate Pj http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Chapter 4.5. - p := reputation.TrustOne.Div(reputation.TrustValueFromInt(ln)) - - for i := range nm.Nodes { - if i == localIndex { - continue - } - - trust := reputation.Trust{} - trust.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) - trust.SetValue(p) - - if err := h(trust); err != nil { - return err - } - } - - return nil -} - func initReputationService(c *cfg) { // consider sharing this between application components nmSrc := newCachedNetmapStorage(c.cfgNetmap.state, c.cfgNetmap.wrapper) c.cfgReputation.localTrustStorage = truststorage.New(truststorage.Prm{}) - trustStorage := &localTrustStorage{ - log: c.log, - storage: c.cfgReputation.localTrustStorage, - nmSrc: nmSrc, - localKey: crypto.MarshalPublicKey(&c.key.PublicKey), + trustStorage := &localreputation.TrustStorage{ + Log: c.log, + Storage: c.cfgReputation.localTrustStorage, + NmSrc: nmSrc, + LocalKey: crypto.MarshalPublicKey(&c.key.PublicKey), } - managerBuilder := &managerBuilder{ - log: c.log, - nmSrc: nmSrc, - } + managerBuilder := localreputation.NewManagerBuilder( + localreputation.ManagersPrm{ + NetMapSource: nmSrc, + }, + localreputation.WithLogger(c.log), + ) routeBuilder := managers.New(managers.Prm{ ManagerBuilder: managerBuilder, }) - remoteLocalTrustProvider := &remoteLocalTrustProvider{ - localAddrSrc: c, - deadEndProvider: trustStorage, - clientCache: cache.NewSDKClientCache(), - key: c.key, - } + remoteLocalTrustProvider := localreputation.NewRemoteTrustProvider( + localreputation.RemoteProviderPrm{ + LocalAddrSrc: c, + DeadEndProvider: trustStorage, + ClientCache: cache.NewSDKClientCache(), + Key: c.key, + }, + ) router := reputationrouter.New( reputationrouter.Prm{ @@ -367,33 +125,12 @@ type reputationServer struct { routeBuilder reputationrouter.Builder } -type epochContext struct { - context.Context - epoch uint64 -} - -func (ctx *epochContext) Epoch() uint64 { - return ctx.epoch -} - -type reputationOnlyKeyRemoteServerInfo struct { - key []byte -} - -func (i *reputationOnlyKeyRemoteServerInfo) PublicKey() []byte { - return i.key -} - -func (*reputationOnlyKeyRemoteServerInfo) Address() string { - return "" -} - func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation.SendLocalTrustRequest) (*v2reputation.SendLocalTrustResponse, error) { var passedRoute []reputationrouter.ServerInfo for hdr := req.GetVerificationHeader(); hdr != nil; hdr = hdr.GetOrigin() { - passedRoute = append(passedRoute, &reputationOnlyKeyRemoteServerInfo{ - key: hdr.GetBodySignature().GetKey(), + passedRoute = append(passedRoute, &localreputation.OnlyKeyRemoteServerInfo{ + Key: hdr.GetBodySignature().GetKey(), }) } @@ -405,9 +142,9 @@ func (s *reputationServer) SendLocalTrust(ctx context.Context, req *v2reputation body := req.GetBody() - eCtx := &epochContext{ + eCtx := &localreputation.EpochContext{ Context: ctx, - epoch: body.GetEpoch(), + E: body.GetEpoch(), } w, err := s.router.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute)) diff --git a/cmd/neofs-node/reputation/local/managers.go b/cmd/neofs-node/reputation/local/managers.go new file mode 100644 index 000000000..b38ccd338 --- /dev/null +++ b/cmd/neofs-node/reputation/local/managers.go @@ -0,0 +1,106 @@ +package local + +import ( + "bytes" + + "github.com/nspcc-dev/hrw" + apiNetmap "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/managers" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// managerBuilder is implementation of reputation ManagerBuilder interface. +// It sorts nodes in NetMap with HRW algorithms and +// takes the next node after the current one as the only manager. +type managerBuilder struct { + log *logger.Logger + nmSrc netmapcore.Source +} + +// ManagersPrm groups the required parameters of the managerBuilder's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type ManagersPrm struct { + NetMapSource netmapcore.Source +} + +// NewManagerBuilder creates a new instance of the managerBuilder. +// +// Panics if at least one value of the parameters is invalid. +// +// The created managerBuilder does not require additional +// initialization and is completely ready for work. +func NewManagerBuilder(prm ManagersPrm, opts ...MngOption) managers.ManagerBuilder { + switch { + case prm.NetMapSource == nil: + panicOnPrmValue("NetMapSource", prm.NetMapSource) + } + + o := defaultMngOpts() + + for i := range opts { + opts[i](o) + } + + return &managerBuilder{ + log: o.log, + nmSrc: prm.NetMapSource, + } +} + +// BuildManagers sorts nodes in NetMap with HRW algorithms and +// takes the next node after the current one as the only manager. +func (mb *managerBuilder) BuildManagers(epoch uint64, p reputation.PeerID) ([]reputationrouter.ServerInfo, error) { + nm, err := mb.nmSrc.GetNetMapByEpoch(epoch) + if err != nil { + return nil, err + } + + // make a copy to keep order consistency of the origin netmap after sorting + nodes := make([]*apiNetmap.Node, len(nm.Nodes)) + + copy(nodes, nm.Nodes) + + hrw.SortSliceByValue(nodes, epoch) + + for i := range nodes { + if bytes.Equal(nodes[i].PublicKey(), p.Bytes()) { + managerIndex := i + 1 + + if managerIndex == len(nodes) { + managerIndex = 0 + } + + return []reputationrouter.ServerInfo{nodes[managerIndex]}, nil + } + } + + return nil, nil +} + +type mngOptions struct { + log *logger.Logger +} + +type MngOption func(*mngOptions) + +func defaultMngOpts() *mngOptions { + return &mngOptions{ + log: zap.L(), + } +} + +// WithLogger returns MngOption to specify logging component. +func WithLogger(l *logger.Logger) MngOption { + return func(o *mngOptions) { + if l != nil { + o.log = l + } + } +} diff --git a/cmd/neofs-node/reputation/local/remote.go b/cmd/neofs-node/reputation/local/remote.go new file mode 100644 index 000000000..c760f7952 --- /dev/null +++ b/cmd/neofs-node/reputation/local/remote.go @@ -0,0 +1,137 @@ +package local + +import ( + "crypto/ecdsa" + + apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client" + reputationapi "github.com/nspcc-dev/neofs-api-go/pkg/reputation" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" + reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" + trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" + "github.com/pkg/errors" +) + +type clientCache interface { + Get(string) (apiClient.Client, error) +} + +// remoteTrustProvider is implementation of reputation RemoteWriterProvider interface. +type remoteTrustProvider struct { + localAddrSrc network.LocalAddressSource + deadEndProvider reputationcommon.WriterProvider + key *ecdsa.PrivateKey + + clientCache clientCache +} + +// RemoteProviderPrm groups the required parameters of the remoteTrustProvider's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type RemoteProviderPrm struct { + LocalAddrSrc network.LocalAddressSource + DeadEndProvider reputationcommon.WriterProvider + Key *ecdsa.PrivateKey + ClientCache clientCache +} + +func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriterProvider { + switch { + case prm.LocalAddrSrc == nil: + panicOnPrmValue("LocalAddrSrc", prm.LocalAddrSrc) + case prm.DeadEndProvider == nil: + panicOnPrmValue("DeadEndProvider", prm.DeadEndProvider) + case prm.Key == nil: + panicOnPrmValue("Key", prm.Key) + case prm.ClientCache == nil: + panicOnPrmValue("ClientCache", prm.ClientCache) + } + + return &remoteTrustProvider{ + localAddrSrc: prm.LocalAddrSrc, + deadEndProvider: prm.DeadEndProvider, + key: prm.Key, + clientCache: prm.ClientCache, + } +} + +func (rtp *remoteTrustProvider) InitRemote(srv reputationrouter.ServerInfo) (reputationcommon.WriterProvider, error) { + if srv == nil { + return rtp.deadEndProvider, nil + } + + addr := srv.Address() + + if rtp.localAddrSrc.LocalAddress().String() == srv.Address() { + // if local => return no-op writer + return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil + } + + ipAddr, err := network.IPAddrFromMultiaddr(addr) + if err != nil { + return nil, errors.Wrap(err, "could not convert address to IP format") + } + + c, err := rtp.clientCache.Get(ipAddr) + if err != nil { + return nil, errors.Wrap(err, "could not initialize API client") + } + + return &RemoteTrustWriterProvider{ + client: c, + key: rtp.key, + }, nil +} + +type RemoteTrustWriterProvider struct { + client apiClient.Client + key *ecdsa.PrivateKey +} + +func (rtwp *RemoteTrustWriterProvider) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { + return &RemoteTrustWriter{ + ctx: ctx, + client: rtwp.client, + key: rtwp.key, + }, nil +} + +type RemoteTrustWriter struct { + ctx reputationcommon.Context + client apiClient.Client + key *ecdsa.PrivateKey + + buf []*reputationapi.Trust +} + +func (rtp *RemoteTrustWriter) Write(t reputation.Trust) error { + apiTrust := reputationapi.NewTrust() + + apiPeer := reputationapi.NewPeerID() + apiPeer.SetPublicKey(t.Peer()) + + apiTrust.SetValue(t.Value().Float64()) + apiTrust.SetPeer(apiPeer) + + rtp.buf = append(rtp.buf, apiTrust) + + return nil +} + +func (rtp *RemoteTrustWriter) Close() error { + prm := apiClient.SendLocalTrustPrm{} + + prm.SetEpoch(rtp.ctx.Epoch()) + prm.SetTrusts(rtp.buf) + + _, err := rtp.client.SendLocalTrust( + rtp.ctx, + prm, + apiClient.WithKey(rtp.key), + ) + + return err +} diff --git a/cmd/neofs-node/reputation/local/storage.go b/cmd/neofs-node/reputation/local/storage.go new file mode 100644 index 000000000..c12df4d73 --- /dev/null +++ b/cmd/neofs-node/reputation/local/storage.go @@ -0,0 +1,120 @@ +package local + +import ( + "bytes" + "encoding/hex" + + netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" + reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" + trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" + truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +type TrustStorage struct { + Log *logger.Logger + + Storage *truststorage.Storage + + NmSrc netmapcore.Source + + LocalKey []byte +} + +func (s *TrustStorage) InitIterator(ctx reputationcommon.Context) (trustcontroller.Iterator, error) { + epochStorage, err := s.Storage.DataForEpoch(ctx.Epoch()) + if err != nil && !errors.Is(err, truststorage.ErrNoPositiveTrust) { + return nil, err + } + + return &TrustIterator{ + ctx: ctx, + storage: s, + epochStorage: epochStorage, + }, nil +} + +func (s *TrustStorage) InitWriter(ctx reputationcommon.Context) (reputationcommon.Writer, error) { + return &TrustLogger{ + ctx: ctx, + log: s.Log, + }, nil +} + +type TrustIterator struct { + ctx reputationcommon.Context + + storage *TrustStorage + + epochStorage *truststorage.EpochTrustValueStorage +} + +func (it *TrustIterator) Iterate(h reputation.TrustHandler) error { + if it.epochStorage != nil { + err := it.epochStorage.Iterate(h) + if !errors.Is(err, truststorage.ErrNoPositiveTrust) { + return err + } + } + + nm, err := it.storage.NmSrc.GetNetMapByEpoch(it.ctx.Epoch()) + if err != nil { + return err + } + + // find out if local node is presented in netmap + localIndex := -1 + + for i := range nm.Nodes { + if bytes.Equal(nm.Nodes[i].PublicKey(), it.storage.LocalKey) { + localIndex = i + } + } + + ln := len(nm.Nodes) + if localIndex >= 0 && ln > 0 { + ln-- + } + + // calculate Pj http://ilpubs.stanford.edu:8090/562/1/2002-56.pdf Chapter 4.5. + p := reputation.TrustOne.Div(reputation.TrustValueFromInt(ln)) + + for i := range nm.Nodes { + if i == localIndex { + continue + } + + trust := reputation.Trust{} + trust.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) + trust.SetValue(p) + + if err := h(trust); err != nil { + return err + } + } + + return nil +} + +type TrustLogger struct { + ctx reputationcommon.Context + + log *logger.Logger +} + +func (l *TrustLogger) Write(t reputation.Trust) error { + l.log.Info("received local trust", + zap.Uint64("epoch", l.ctx.Epoch()), + zap.String("peer", hex.EncodeToString(t.Peer().Bytes())), + zap.Stringer("value", t.Value()), + ) + + return nil +} + +func (*TrustLogger) Close() error { + return nil +} diff --git a/cmd/neofs-node/reputation/local/util.go b/cmd/neofs-node/reputation/local/util.go new file mode 100644 index 000000000..2f030153e --- /dev/null +++ b/cmd/neofs-node/reputation/local/util.go @@ -0,0 +1,45 @@ +package local + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/services/reputation" +) + +type EpochContext struct { + context.Context + E uint64 +} + +func (ctx *EpochContext) Epoch() uint64 { + return ctx.E +} + +type NopReputationWriter struct{} + +func (NopReputationWriter) Write(reputation.Trust) error { + return nil +} + +func (NopReputationWriter) Close() error { + return nil +} + +type OnlyKeyRemoteServerInfo struct { + Key []byte +} + +func (i *OnlyKeyRemoteServerInfo) PublicKey() []byte { + return i.Key +} + +func (*OnlyKeyRemoteServerInfo) Address() string { + return "" +} + +const invalidPrmValFmt = "invalid parameter %s (%T):%v" + +func panicOnPrmValue(n string, v interface{}) { + panic(fmt.Sprintf(invalidPrmValFmt, n, v, v)) +}