forked from TrueCloudLab/frostfs-node
[#1817] network: Allow to use network addresses from the iterator
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
76893bdc50
commit
236414df49
21 changed files with 203 additions and 17 deletions
|
@ -6,7 +6,7 @@ Changelog for NeoFS Node
|
|||
### Added
|
||||
- Serving `NetmapService.NetmapSnapshot` RPC (#1793)
|
||||
- `netmap snapshot` command of NeoFS CLI (#1793)
|
||||
|
||||
- `apiclient.allow_external` config flag to fallback to node external addresses (#1817)
|
||||
- Changelog updates CI step (#1808)
|
||||
- Validate storage node configuration before node startup (#1805)
|
||||
- `neofs-node -check` command to check the configuration file (#1805)
|
||||
|
@ -31,6 +31,8 @@ Changelog for NeoFS Node
|
|||
|
||||
### Updating from v0.32.0
|
||||
Replace using the `control netmap-snapshot` command with `netmap snapshot` one in NeoFS CLI.
|
||||
Node can now specify additional addresses in `ExternalAddr` attribute. To allow a node to dial
|
||||
other nodes external address, use `apiclient.allow_external` config setting.
|
||||
|
||||
## [0.32.0] - 2022-09-14 - Pungdo (풍도, 楓島)
|
||||
|
||||
|
|
|
@ -537,6 +537,7 @@ func initCfg(appCfg *config.Config) *cfg {
|
|||
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
||||
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
||||
Key: &key.PrivateKey,
|
||||
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
||||
}),
|
||||
persistate: persistate,
|
||||
}
|
||||
|
|
|
@ -41,3 +41,11 @@ func StreamTimeout(c *config.Config) time.Duration {
|
|||
|
||||
return StreamTimeoutDefault
|
||||
}
|
||||
|
||||
// AllowExternal returns the value of "allow_external" config parameter
|
||||
// from "apiclient" section.
|
||||
//
|
||||
// Returns false if the value is missing or invalid.
|
||||
func AllowExternal(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "allow_external")
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ func TestApiclientSection(t *testing.T) {
|
|||
|
||||
require.Equal(t, apiclientconfig.DialTimeoutDefault, apiclientconfig.DialTimeout(empty))
|
||||
require.Equal(t, apiclientconfig.StreamTimeoutDefault, apiclientconfig.StreamTimeout(empty))
|
||||
require.False(t, apiclientconfig.AllowExternal(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
@ -23,6 +24,7 @@ func TestApiclientSection(t *testing.T) {
|
|||
var fileConfigTest = func(c *config.Config) {
|
||||
require.Equal(t, 15*time.Second, apiclientconfig.DialTimeout(c))
|
||||
require.Equal(t, 20*time.Second, apiclientconfig.StreamTimeout(c))
|
||||
require.True(t, apiclientconfig.AllowExternal(c))
|
||||
}
|
||||
|
||||
configtest.ForEachFileType(path, fileConfigTest)
|
||||
|
|
|
@ -507,6 +507,10 @@ func (c *cfg) NumberOfAddresses() int {
|
|||
return c.addressNum()
|
||||
}
|
||||
|
||||
func (c *cfg) ExternalAddresses() []string {
|
||||
return c.cfgNodeInfo.localInfo.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) PublicKey() []byte {
|
||||
return nodeKeyFromNetmap(c.cfg)
|
||||
}
|
||||
|
@ -519,6 +523,10 @@ func (c *usedSpaceService) NumberOfAddresses() int {
|
|||
return c.cfg.addressNum()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) ExternalAddresses() []string {
|
||||
return c.cfg.ExternalAddresses()
|
||||
}
|
||||
|
||||
func (c *usedSpaceService) AnnounceUsedSpace(ctx context.Context, req *containerV2.AnnounceUsedSpaceRequest) (*containerV2.AnnounceUsedSpaceResponse, error) {
|
||||
var passedRoute []loadroute.ServerInfo
|
||||
|
||||
|
@ -577,6 +585,10 @@ func (*containerOnlyKeyRemoteServerInfo) NumberOfAddresses() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (*containerOnlyKeyRemoteServerInfo) ExternalAddresses() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loadPlacementBuilder) isNodeFromContainerKey(epoch uint64, cnr cid.ID, key []byte) (bool, error) {
|
||||
cnrNodes, _, err := l.buildPlacement(epoch, cnr)
|
||||
if err != nil {
|
||||
|
|
|
@ -44,6 +44,10 @@ func (*OnlyKeyRemoteServerInfo) NumberOfAddresses() int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func (*OnlyKeyRemoteServerInfo) ExternalAddresses() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
const invalidPrmValFmt = "invalid parameter %s (%T):%v"
|
||||
|
||||
func PanicOnPrmValue(n string, v interface{}) {
|
||||
|
|
|
@ -70,6 +70,7 @@ NEOFS_MORPH_RPC_ENDPOINT_1_PRIORITY=2
|
|||
# API Client section
|
||||
NEOFS_APICLIENT_DIAL_TIMEOUT=15s
|
||||
NEOFS_APICLIENT_STREAM_TIMEOUT=20s
|
||||
NEOFS_APICLIENT_ALLOW_EXTERNAL=true
|
||||
|
||||
# Policer section
|
||||
NEOFS_POLICER_HEAD_TIMEOUT=15s
|
||||
|
|
|
@ -113,7 +113,8 @@
|
|||
},
|
||||
"apiclient": {
|
||||
"dial_timeout": "15s",
|
||||
"stream_timeout": "20s"
|
||||
"stream_timeout": "20s",
|
||||
"allow_external": true
|
||||
},
|
||||
"policer": {
|
||||
"head_timeout": "15s"
|
||||
|
|
|
@ -92,6 +92,7 @@ morph:
|
|||
apiclient:
|
||||
dial_timeout: 15s # timeout for NEOFS API client connection
|
||||
stream_timeout: 20s # timeout for individual operations in a streaming RPC
|
||||
allow_external: true # allow to fallback to addresses in `ExternalAddr` attribute
|
||||
|
||||
policer:
|
||||
head_timeout: 15s # timeout for the Policer HEAD remote operation
|
||||
|
|
|
@ -39,6 +39,8 @@ type MultiAddressClient interface {
|
|||
type NodeInfo struct {
|
||||
addrGroup network.AddressGroup
|
||||
|
||||
externalAddrGroup network.AddressGroup
|
||||
|
||||
key []byte
|
||||
}
|
||||
|
||||
|
@ -52,6 +54,16 @@ func (x NodeInfo) AddressGroup() network.AddressGroup {
|
|||
return x.addrGroup
|
||||
}
|
||||
|
||||
// SetExternalAddressGroup sets an external group of network addresses.
|
||||
func (x *NodeInfo) SetExternalAddressGroup(v network.AddressGroup) {
|
||||
x.externalAddrGroup = v
|
||||
}
|
||||
|
||||
// ExternalAddressGroup returns a group of network addresses.
|
||||
func (x NodeInfo) ExternalAddressGroup() network.AddressGroup {
|
||||
return x.externalAddrGroup
|
||||
}
|
||||
|
||||
// SetPublicKey sets a public key in a binary format.
|
||||
//
|
||||
// Argument must not be mutated.
|
||||
|
|
|
@ -8,9 +8,10 @@ import (
|
|||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
)
|
||||
|
||||
func nodeInfoFromKeyAddr(dst *NodeInfo, k []byte, a network.AddressGroup) {
|
||||
func nodeInfoFromKeyAddr(dst *NodeInfo, k []byte, a, external network.AddressGroup) {
|
||||
dst.SetPublicKey(k)
|
||||
dst.SetAddressGroup(a)
|
||||
dst.SetExternalAddressGroup(external)
|
||||
}
|
||||
|
||||
// NodeInfoFromRawNetmapElement fills NodeInfo structure from the interface of raw netmap member's descriptor.
|
||||
|
@ -20,6 +21,7 @@ func NodeInfoFromRawNetmapElement(dst *NodeInfo, info interface {
|
|||
PublicKey() []byte
|
||||
IterateAddresses(func(string) bool)
|
||||
NumberOfAddresses() int
|
||||
ExternalAddresses() []string
|
||||
}) error {
|
||||
var a network.AddressGroup
|
||||
|
||||
|
@ -28,7 +30,14 @@ func NodeInfoFromRawNetmapElement(dst *NodeInfo, info interface {
|
|||
return fmt.Errorf("parse network address: %w", err)
|
||||
}
|
||||
|
||||
nodeInfoFromKeyAddr(dst, info.PublicKey(), a)
|
||||
var external network.AddressGroup
|
||||
|
||||
ext := info.ExternalAddresses()
|
||||
if len(ext) > 0 {
|
||||
_ = external.FromStringSlice(ext)
|
||||
}
|
||||
|
||||
nodeInfoFromKeyAddr(dst, info.PublicKey(), a, external)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -39,8 +48,9 @@ func NodeInfoFromRawNetmapElement(dst *NodeInfo, info interface {
|
|||
func NodeInfoFromNetmapElement(dst *NodeInfo, info interface {
|
||||
PublicKey() []byte
|
||||
Addresses() network.AddressGroup
|
||||
ExternalAddresses() network.AddressGroup
|
||||
}) {
|
||||
nodeInfoFromKeyAddr(dst, info.PublicKey(), info.Addresses())
|
||||
nodeInfoFromKeyAddr(dst, info.PublicKey(), info.Addresses(), info.ExternalAddresses())
|
||||
}
|
||||
|
||||
// AssertKeyResponseCallback returns client response callback which checks if the response was signed by the expected key.
|
||||
|
|
|
@ -17,6 +17,11 @@ func (x Node) PublicKey() []byte {
|
|||
// IterateAddresses iterates over all announced network addresses
|
||||
// and passes them into f. Handler MUST NOT be nil.
|
||||
func (x Node) IterateAddresses(f func(string) bool) {
|
||||
for _, addr := range (netmap.NodeInfo)(x).ExternalAddresses() {
|
||||
if f(addr) {
|
||||
return
|
||||
}
|
||||
}
|
||||
(netmap.NodeInfo)(x).IterateNetworkEndpoints(f)
|
||||
}
|
||||
|
||||
|
@ -25,6 +30,11 @@ func (x Node) NumberOfAddresses() int {
|
|||
return (netmap.NodeInfo)(x).NumberOfNetworkEndpoints()
|
||||
}
|
||||
|
||||
// ExternalAddresses returns external addresses of a node.
|
||||
func (x Node) ExternalAddresses() []string {
|
||||
return (netmap.NodeInfo)(x).ExternalAddresses()
|
||||
}
|
||||
|
||||
// Nodes is a named type of []netmap.NodeInfo which provides interface needed
|
||||
// in the current repository. Nodes is expected to be used everywhere instead
|
||||
// of direct usage of []netmap.NodeInfo, so it represents a type mediator.
|
||||
|
|
|
@ -558,6 +558,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<-
|
|||
SGTimeout: cfg.GetDuration("audit.timeout.get"),
|
||||
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
|
||||
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
|
||||
AllowExternal: cfg.GetBool("audit.allow_external"),
|
||||
})
|
||||
|
||||
server.registerNoErrCloser(clientCache.cache.CloseAll)
|
||||
|
|
|
@ -37,6 +37,8 @@ type (
|
|||
Log *zap.Logger
|
||||
Key *ecdsa.PrivateKey
|
||||
|
||||
AllowExternal bool
|
||||
|
||||
SGTimeout, HeadTimeout, RangeTimeout time.Duration
|
||||
}
|
||||
)
|
||||
|
@ -44,7 +46,7 @@ type (
|
|||
func newClientCache(p *clientCacheParams) *ClientCache {
|
||||
return &ClientCache{
|
||||
log: p.Log,
|
||||
cache: cache.NewSDKClientCache(cache.ClientCacheOpts{}),
|
||||
cache: cache.NewSDKClientCache(cache.ClientCacheOpts{AllowExternal: p.AllowExternal}),
|
||||
key: p.Key,
|
||||
sgTimeout: p.SGTimeout,
|
||||
headTimeout: p.HeadTimeout,
|
||||
|
|
6
pkg/network/cache/client.go
vendored
6
pkg/network/cache/client.go
vendored
|
@ -16,6 +16,7 @@ type (
|
|||
mu *sync.RWMutex
|
||||
clients map[string]*multiClient
|
||||
opts ClientCacheOpts
|
||||
allowExternal bool
|
||||
}
|
||||
|
||||
ClientCacheOpts struct {
|
||||
|
@ -23,6 +24,7 @@ type (
|
|||
StreamTimeout time.Duration
|
||||
Key *ecdsa.PrivateKey
|
||||
ResponseCallback func(client.ResponseMetaInfo) error
|
||||
AllowExternal bool
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -33,12 +35,16 @@ func NewSDKClientCache(opts ClientCacheOpts) *ClientCache {
|
|||
mu: new(sync.RWMutex),
|
||||
clients: make(map[string]*multiClient),
|
||||
opts: opts,
|
||||
allowExternal: opts.AllowExternal,
|
||||
}
|
||||
}
|
||||
|
||||
// Get function returns existing client or creates a new one.
|
||||
func (c *ClientCache) Get(info clientcore.NodeInfo) (clientcore.Client, error) {
|
||||
netAddr := info.AddressGroup()
|
||||
if c.allowExternal {
|
||||
netAddr = append(netAddr, info.ExternalAddressGroup()...)
|
||||
}
|
||||
cacheKey := string(info.PublicKey())
|
||||
|
||||
c.mu.RLock()
|
||||
|
|
|
@ -75,6 +75,27 @@ type MultiAddressIterator interface {
|
|||
NumberOfAddresses() int
|
||||
}
|
||||
|
||||
// FromStringSlice forms AddressGroup from a string slice.
|
||||
//
|
||||
// Returns an error in the absence of addresses or if any of the addresses are incorrect.
|
||||
func (x *AddressGroup) FromStringSlice(addr []string) error {
|
||||
if len(addr) == 0 {
|
||||
return errors.New("missing network addresses")
|
||||
}
|
||||
|
||||
res := make(AddressGroup, len(addr))
|
||||
for i := range addr {
|
||||
var a Address
|
||||
if err := a.FromString(addr[i]); err != nil {
|
||||
return err // invalid format, ignore the whole field
|
||||
}
|
||||
res[i] = a
|
||||
}
|
||||
|
||||
*x = res
|
||||
return nil
|
||||
}
|
||||
|
||||
// FromIterator forms AddressGroup from MultiAddressIterator structure.
|
||||
// The result is sorted with sort.Sort.
|
||||
//
|
||||
|
|
69
pkg/network/group_test.go
Normal file
69
pkg/network/group_test.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package network
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAddressGroup_FromStringSlice(t *testing.T) {
|
||||
addrs := []string{
|
||||
"/dns4/node1.neofs/tcp/8080",
|
||||
"/dns4/node2.neofs/tcp/1234/tls",
|
||||
}
|
||||
expected := make(AddressGroup, len(addrs))
|
||||
for i := range addrs {
|
||||
expected[i] = Address{buildMultiaddr(addrs[i], t)}
|
||||
}
|
||||
|
||||
var ag AddressGroup
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
require.Error(t, ag.FromStringSlice(nil))
|
||||
})
|
||||
|
||||
require.NoError(t, ag.FromStringSlice(addrs))
|
||||
require.Equal(t, expected, ag)
|
||||
|
||||
t.Run("error is returned, group is unchanged", func(t *testing.T) {
|
||||
require.Error(t, ag.FromStringSlice([]string{"invalid"}))
|
||||
require.Equal(t, expected, ag)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAddressGroup_FromIterator(t *testing.T) {
|
||||
addrs := testIterator{
|
||||
"/dns4/node1.neofs/tcp/8080",
|
||||
"/dns4/node2.neofs/tcp/1234/tls",
|
||||
}
|
||||
expected := make(AddressGroup, len(addrs))
|
||||
for i := range addrs {
|
||||
expected[i] = Address{buildMultiaddr(addrs[i], t)}
|
||||
}
|
||||
sort.Sort(expected)
|
||||
|
||||
var ag AddressGroup
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
require.Error(t, ag.FromIterator(testIterator{}))
|
||||
})
|
||||
|
||||
require.NoError(t, ag.FromIterator(addrs))
|
||||
require.Equal(t, expected, ag)
|
||||
|
||||
t.Run("error is returned, group is unchanged", func(t *testing.T) {
|
||||
require.Error(t, ag.FromIterator(testIterator{"invalid"}))
|
||||
require.Equal(t, expected, ag)
|
||||
})
|
||||
}
|
||||
|
||||
type testIterator []string
|
||||
|
||||
func (t testIterator) IterateAddresses(f func(string) bool) {
|
||||
for i := range t {
|
||||
f(t[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (t testIterator) NumberOfAddresses() int {
|
||||
return len(t)
|
||||
}
|
|
@ -19,6 +19,9 @@ type ServerInfo interface {
|
|||
|
||||
// Returns number of server's network addresses.
|
||||
NumberOfAddresses() int
|
||||
|
||||
// ExternalAddresses returns external node's addresses.
|
||||
ExternalAddresses() []string
|
||||
}
|
||||
|
||||
// Builder groups methods to route values in the network.
|
||||
|
|
|
@ -126,6 +126,8 @@ func flatNodes(ns [][]netmap.NodeInfo) [][]netmap.NodeInfo {
|
|||
type Node struct {
|
||||
addresses network.AddressGroup
|
||||
|
||||
externalAddresses network.AddressGroup
|
||||
|
||||
key []byte
|
||||
}
|
||||
|
||||
|
@ -134,6 +136,11 @@ func (x Node) Addresses() network.AddressGroup {
|
|||
return x.addresses
|
||||
}
|
||||
|
||||
// ExternalAddresses returns group of network addresses.
|
||||
func (x Node) ExternalAddresses() network.AddressGroup {
|
||||
return x.externalAddresses
|
||||
}
|
||||
|
||||
// PublicKey returns public key in a binary format. Should not be mutated.
|
||||
func (x Node) PublicKey() []byte {
|
||||
return x.key
|
||||
|
@ -167,6 +174,12 @@ func (t *Traverser) Next() []Node {
|
|||
return nil
|
||||
}
|
||||
|
||||
ext := t.vectors[0][i].ExternalAddresses()
|
||||
if len(ext) > 0 {
|
||||
// Ignore the error if this field is incorrectly formed.
|
||||
_ = nodes[i].externalAddresses.FromStringSlice(ext)
|
||||
}
|
||||
|
||||
nodes[i].key = t.vectors[0][i].PublicKey()
|
||||
}
|
||||
|
||||
|
|
|
@ -77,4 +77,7 @@ type ServerInfo interface {
|
|||
|
||||
// Returns number of server's network addresses.
|
||||
NumberOfAddresses() int
|
||||
|
||||
// ExternalAddresses returns external addresses of a node.
|
||||
ExternalAddresses() []string
|
||||
}
|
||||
|
|
|
@ -69,6 +69,10 @@ func (x nodeServer) NumberOfAddresses() int {
|
|||
return (apiNetmap.NodeInfo)(x).NumberOfNetworkEndpoints()
|
||||
}
|
||||
|
||||
func (x nodeServer) ExternalAddresses() []string {
|
||||
return (apiNetmap.NodeInfo)(x).ExternalAddresses()
|
||||
}
|
||||
|
||||
// BuildManagers sorts nodes in NetMap with HRW algorithms and
|
||||
// takes the next node after the current one as the only manager.
|
||||
func (mb *managerBuilder) BuildManagers(epoch uint64, p apireputation.PeerID) ([]ServerInfo, error) {
|
||||
|
|
Loading…
Reference in a new issue