From 358e3ed8c4b6ba99d128a0ff71a2e805abc62f83 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 6 Sep 2021 15:17:14 +0300 Subject: [PATCH] [#645] *: Change the locality condition of the node from the placement Some software components regulate the way of working with placement arrays when a local node enters it. In the previous implementation, the locality criterion was the correspondence between the announced network address (group) and the address with which the node was configured. However, by design, network addresses are not unique identifiers of storage nodes in the system. Change comparisons by network addresses to comparisons by keys in all packages with the logic described above. Implement `netmap.AnnouncedKeys` interface on `cfg` type in the storage node application. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/container.go | 18 ++++++---- cmd/neofs-node/object.go | 4 +-- cmd/neofs-node/reputation.go | 4 +-- cmd/neofs-node/reputation/common/remote.go | 21 ++++++------ pkg/services/object/put/distributed.go | 5 ++- pkg/services/object/put/service.go | 6 ++-- pkg/services/object/put/streamer.go | 10 +++--- pkg/services/object/util/placement.go | 40 +++++++++++----------- pkg/services/policer/check.go | 2 +- pkg/services/policer/policer.go | 10 +++--- 10 files changed, 62 insertions(+), 58 deletions(-) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index f77bc2f3..22e33ffa 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -97,7 +97,7 @@ func initContainerService(c *cfg) { LocalServerInfo: c, RemoteWriterProvider: &remoteLoadAnnounceProvider{ key: &c.key.PrivateKey, - loadAddrSrc: c, + netmapKeys: c, clientCache: c.clientCache, deadEndProvider: loadcontroller.SimpleWriterProvider(loadAccumulator), }, @@ -220,7 +220,7 @@ func (nopLoadWriter) Close() error { type remoteLoadAnnounceProvider struct { key *ecdsa.PrivateKey - loadAddrSrc network.LocalAddressSource + netmapKeys netmapCore.AnnouncedKeys clientCache interface { Get(network.AddressGroup) (apiClient.Client, error) @@ -234,6 +234,11 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return r.deadEndProvider, nil } + if r.netmapKeys.IsLocalKey(srv.PublicKey()) { + // if local => return no-op writer + return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil + } + var netAddr network.AddressGroup err := netAddr.FromIterator(srv) @@ -241,11 +246,6 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return nil, fmt.Errorf("could not convert address to IP format: %w", err) } - if network.IsLocalAddress(r.loadAddrSrc, netAddr) { - // if local => return no-op writer - return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil - } - c, err := r.clientCache.Get(netAddr) if err != nil { return nil, fmt.Errorf("could not initialize API client: %w", err) @@ -385,6 +385,10 @@ func (c *cfg) PublicKey() []byte { return nodeKeyFromNetmap(c) } +func (c *cfg) IsLocalKey(key []byte) bool { + return bytes.Equal(key, c.PublicKey()) +} + func (c *cfg) IterateAddresses(f func(string) bool) { c.iterateNetworkAddresses(f) } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index a296ef33..364f7b51 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -240,7 +240,7 @@ func initObjectService(c *cfg) { policer.WithRemoteHeader( headsvc.NewRemoteHeader(keyStorage, clientConstructor), ), - policer.WithLocalAddressSource(c), + policer.WithNetmapKeys(c), policer.WithHeadTimeout( policerconfig.HeadTimeout(c.appCfg), ), @@ -276,7 +276,7 @@ func initObjectService(c *cfg) { putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrSource), putsvc.WithNetworkMapSource(c.cfgObject.netMapSource), - putsvc.WithLocalAddressSource(c), + putsvc.WithNetmapKeys(c), putsvc.WithFormatValidatorOpts( objectCore.WithDeleteHandler(objInhumer), ), diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 8973cbd9..45cbfd50 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -87,7 +87,7 @@ func initReputationService(c *cfg) { remoteLocalTrustProvider := common.NewRemoteTrustProvider( common.RemoteProviderPrm{ - LocalAddrSrc: c, + NetmapKeys: c, DeadEndProvider: daughterStorageWriterProvider, ClientCache: c.clientCache, WriterProvider: localreputation.NewRemoteProvider( @@ -100,7 +100,7 @@ func initReputationService(c *cfg) { remoteIntermediateTrustProvider := common.NewRemoteTrustProvider( common.RemoteProviderPrm{ - LocalAddrSrc: c, + NetmapKeys: c, DeadEndProvider: consumerStorageWriterProvider, ClientCache: c.clientCache, WriterProvider: intermediatereputation.NewRemoteProvider( diff --git a/cmd/neofs-node/reputation/common/remote.go b/cmd/neofs-node/reputation/common/remote.go index d14098c0..ecd2aaaa 100644 --- a/cmd/neofs-node/reputation/common/remote.go +++ b/cmd/neofs-node/reputation/common/remote.go @@ -4,6 +4,7 @@ import ( "fmt" apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" @@ -27,7 +28,7 @@ type clientKeyRemoteProvider interface { // // remoteTrustProvider requires to be provided with clientKeyRemoteProvider. type remoteTrustProvider struct { - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys deadEndProvider reputationcommon.WriterProvider clientCache clientCache remoteProvider clientKeyRemoteProvider @@ -39,7 +40,7 @@ type remoteTrustProvider struct { // Passing incorrect parameter values will result in constructor // failure (error or panic depending on the implementation). type RemoteProviderPrm struct { - LocalAddrSrc network.LocalAddressSource + NetmapKeys netmap.AnnouncedKeys DeadEndProvider reputationcommon.WriterProvider ClientCache clientCache WriterProvider clientKeyRemoteProvider @@ -47,8 +48,8 @@ type RemoteProviderPrm struct { func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriterProvider { switch { - case prm.LocalAddrSrc == nil: - PanicOnPrmValue("LocalAddrSrc", prm.LocalAddrSrc) + case prm.NetmapKeys == nil: + PanicOnPrmValue("NetmapKeys", prm.NetmapKeys) case prm.DeadEndProvider == nil: PanicOnPrmValue("DeadEndProvider", prm.DeadEndProvider) case prm.ClientCache == nil: @@ -58,7 +59,7 @@ func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriter } return &remoteTrustProvider{ - localAddrSrc: prm.LocalAddrSrc, + netmapKeys: prm.NetmapKeys, deadEndProvider: prm.DeadEndProvider, clientCache: prm.ClientCache, remoteProvider: prm.WriterProvider, @@ -70,6 +71,11 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep return rtp.deadEndProvider, nil } + if rtp.netmapKeys.IsLocalKey(srv.PublicKey()) { + // if local => return no-op writer + return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil + } + var netAddr network.AddressGroup err := netAddr.FromIterator(srv) @@ -77,11 +83,6 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep return nil, fmt.Errorf("could not convert address to IP format: %w", err) } - if network.IsLocalAddress(rtp.localAddrSrc, netAddr) { - // if local => return no-op writer - return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil - } - c, err := rtp.clientCache.Get(netAddr) if err != nil { return nil, fmt.Errorf("could not initialize API client: %w", err) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 1eea616a..10ec88ec 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/network" svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" @@ -23,7 +22,7 @@ type distributedTarget struct { chunks [][]byte - nodeTargetInitializer func(network.AddressGroup) transformer.ObjectTarget + nodeTargetInitializer func(placement.Node) transformer.ObjectTarget relay func(placement.Node) error @@ -76,7 +75,7 @@ func (t *distributedTarget) sendObject(node placement.Node) error { } } - target := t.nodeTargetInitializer(node.Addresses()) + target := t.nodeTargetInitializer(node) if err := target.WriteHeader(t.obj); err != nil { return fmt.Errorf("could not write header: %w", err) diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 14817d3a..78af791d 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -46,7 +46,7 @@ type cfg struct { workerPool util.WorkerPool - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys fmtValidator *object.FormatValidator @@ -123,9 +123,9 @@ func WithWorkerPool(v util.WorkerPool) Option { } } -func WithLocalAddressSource(v network.LocalAddressSource) Option { +func WithNetmapKeys(v netmap.AnnouncedKeys) Option { return func(c *cfg) { - c.localAddrSrc = v + c.netmapKeys = v } } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 5637ed81..6388ecb1 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -135,7 +135,7 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error { prm.traverseOpts = append(prm.traverseOpts, placement.SuccessAfter(1)) // use local-only placement builder - builder = util.NewLocalPlacement(builder, p.localAddrSrc) + builder = util.NewLocalPlacement(builder, p.netmapKeys) } // set placement builder @@ -152,7 +152,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { relay = func(node placement.Node) error { addr := node.Addresses() - if network.IsLocalAddress(p.localAddrSrc, addr) { + if p.netmapKeys.IsLocalKey(node.Key()) { return errLocalAddress } @@ -168,8 +168,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return &distributedTarget{ traverseOpts: prm.traverseOpts, workerPool: p.workerPool, - nodeTargetInitializer: func(addr network.AddressGroup) transformer.ObjectTarget { - if network.IsLocalAddress(p.localAddrSrc, addr) { + nodeTargetInitializer: func(node placement.Node) transformer.ObjectTarget { + if p.netmapKeys.IsLocalKey(node.Key()) { return &localTarget{ storage: p.localStore, } @@ -179,7 +179,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { ctx: p.ctx, keyStorage: p.keyStorage, commonPrm: prm.common, - addr: addr, + addr: node.Addresses(), clientConstructor: p.clientConstructor, } }, diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 7959c161..a63da2a0 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -14,13 +14,13 @@ import ( type localPlacement struct { builder placement.Builder - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys } type remotePlacement struct { builder placement.Builder - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys } // TraverserGenerator represents tool that generates @@ -30,15 +30,15 @@ type TraverserGenerator struct { cnrSrc container.Source - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys customOpts []placement.Option } -func NewLocalPlacement(b placement.Builder, s network.LocalAddressSource) placement.Builder { +func NewLocalPlacement(b placement.Builder, s netmap.AnnouncedKeys) placement.Builder { return &localPlacement{ - builder: b, - localAddrSrc: s, + builder: b, + netmapKeys: s, } } @@ -58,7 +58,7 @@ func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK. continue } - if network.IsLocalAddress(p.localAddrSrc, addr) { + if p.netmapKeys.IsLocalKey(vs[i][j].PublicKey()) { return []netmapSDK.Nodes{{vs[i][j]}}, nil } } @@ -69,10 +69,10 @@ func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK. // NewRemotePlacementBuilder creates, initializes and returns placement builder that // excludes local node from any placement vector. -func NewRemotePlacementBuilder(b placement.Builder, s network.LocalAddressSource) placement.Builder { +func NewRemotePlacementBuilder(b placement.Builder, s netmap.AnnouncedKeys) placement.Builder { return &remotePlacement{ - builder: b, - localAddrSrc: s, + builder: b, + netmapKeys: s, } } @@ -92,7 +92,7 @@ func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK continue } - if network.IsLocalAddress(p.localAddrSrc, addr) { + if p.netmapKeys.IsLocalKey(vs[i][j].PublicKey()) { vs[i] = append(vs[i][:j], vs[i][j+1:]...) j-- } @@ -103,21 +103,21 @@ func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK } // NewTraverserGenerator creates, initializes and returns new TraverserGenerator instance. -func NewTraverserGenerator(nmSrc netmap.Source, cnrSrc container.Source, localAddrSrc network.LocalAddressSource) *TraverserGenerator { +func NewTraverserGenerator(nmSrc netmap.Source, cnrSrc container.Source, netmapKeys netmap.AnnouncedKeys) *TraverserGenerator { return &TraverserGenerator{ - netMapSrc: nmSrc, - cnrSrc: cnrSrc, - localAddrSrc: localAddrSrc, + netMapSrc: nmSrc, + cnrSrc: cnrSrc, + netmapKeys: netmapKeys, } } // WithTraverseOptions returns TraverseGenerator that additionally applies provided options. func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *TraverserGenerator { return &TraverserGenerator{ - netMapSrc: g.netMapSrc, - cnrSrc: g.cnrSrc, - localAddrSrc: g.localAddrSrc, - customOpts: opts, + netMapSrc: g.netMapSrc, + cnrSrc: g.cnrSrc, + netmapKeys: g.netmapKeys, + customOpts: opts, } } @@ -143,7 +143,7 @@ func (g *TraverserGenerator) GenerateTraverser(addr *object.Address, epoch uint6 // create builder of the remote nodes from network map builder := NewRemotePlacementBuilder( placement.NewNetworkMapBuilder(nm), - g.localAddrSrc, + g.netmapKeys, ) traverseOpts = append(traverseOpts, diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index c337055f..76484c48 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -68,7 +68,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes continue } - if network.IsLocalAddress(p.localAddrSrc, node) { + if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) { if shortage == 0 { // we can call the redundant copy callback // here to slightly improve the performance diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index c091a7d7..337d1f56 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -6,8 +6,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" @@ -47,7 +47,7 @@ type cfg struct { remoteHeader *headsvc.RemoteHeader - localAddrSrc network.LocalAddressSource + netmapKeys netmap.AnnouncedKeys replicator *replicator.Replicator @@ -142,10 +142,10 @@ func WithRemoteHeader(v *headsvc.RemoteHeader) Option { } } -// WithLocalAddressSource returns option to set local address source of Policer. -func WithLocalAddressSource(v network.LocalAddressSource) Option { +// WithNetmapKeys returns option to set tool to work with announced public keys. +func WithNetmapKeys(v netmap.AnnouncedKeys) Option { return func(c *cfg) { - c.localAddrSrc = v + c.netmapKeys = v } }