forked from TrueCloudLab/frostfs-node
[#607] network: Do not work with Address pointers
`network.Address` structure in most cases created once and used read-only. Replace `AddressFromString` function with `Address.FromString` method with the same purpose and implementation. Make all libraries to work with value. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
5de074f24c
commit
adbbad0beb
35 changed files with 128 additions and 97 deletions
|
@ -238,15 +238,15 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string) (*ecdsa.PrivateKey, erro
|
||||||
|
|
||||||
// getEndpointAddress returns network address structure that stores multiaddr
|
// getEndpointAddress returns network address structure that stores multiaddr
|
||||||
// inside, parsed from global arguments.
|
// inside, parsed from global arguments.
|
||||||
func getEndpointAddress() (*network.Address, error) {
|
func getEndpointAddress() (addr network.Address, err error) {
|
||||||
endpoint := viper.GetString("rpc")
|
endpoint := viper.GetString("rpc")
|
||||||
|
|
||||||
addr, err := network.AddressFromString(endpoint)
|
err = addr.FromString(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errInvalidEndpoint
|
err = errInvalidEndpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
return addr, nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSDKClient returns default neofs-api-go sdk client. Consider using
|
// getSDKClient returns default neofs-api-go sdk client. Consider using
|
||||||
|
|
|
@ -90,7 +90,7 @@ type cfg struct {
|
||||||
|
|
||||||
cfgNodeInfo cfgNodeInfo
|
cfgNodeInfo cfgNodeInfo
|
||||||
|
|
||||||
localAddr *network.Address
|
localAddr network.Address
|
||||||
|
|
||||||
cfgObject cfgObject
|
cfgObject cfgObject
|
||||||
|
|
||||||
|
@ -308,7 +308,7 @@ func initCfg(path string) *cfg {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cfg) LocalAddress() *network.Address {
|
func (c *cfg) LocalAddress() network.Address {
|
||||||
return c.localAddr
|
return c.localAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,13 +67,13 @@ func Wallet(c *config.Config) *keys.PrivateKey {
|
||||||
// from "node" section as network.Address.
|
// from "node" section as network.Address.
|
||||||
//
|
//
|
||||||
// Panics if value is not a valid NeoFS network address
|
// Panics if value is not a valid NeoFS network address
|
||||||
func BootstrapAddress(c *config.Config) *network.Address {
|
func BootstrapAddress(c *config.Config) (addr network.Address) {
|
||||||
v := config.StringSafe(c.Sub(subsection), "address")
|
v := config.StringSafe(c.Sub(subsection), "address")
|
||||||
if v == "" {
|
if v == "" {
|
||||||
panic(errAddressNotSet)
|
panic(errAddressNotSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := network.AddressFromString(v)
|
err := addr.FromString(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("could not convert bootstrap address %s to %T: %w", v, addr, err))
|
panic(fmt.Errorf("could not convert bootstrap address %s to %T: %w", v, addr, err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,11 +46,13 @@ func TestNodeSection(t *testing.T) {
|
||||||
relay := Relay(c)
|
relay := Relay(c)
|
||||||
wKey := Wallet(c)
|
wKey := Wallet(c)
|
||||||
|
|
||||||
expectedAddr, err := network.AddressFromString("s01.neofs.devenv:8080")
|
var expectedAddr network.Address
|
||||||
|
|
||||||
|
err := expectedAddr.FromString("s01.neofs.devenv:8080")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", key.Address())
|
require.Equal(t, "NbUgTSFvPmsRxmGeWpuuGeJUoRoi6PErcM", key.Address())
|
||||||
require.Equal(t, true, addr.Equal(*expectedAddr))
|
require.Equal(t, true, addr.Equal(expectedAddr))
|
||||||
require.Equal(t, true, relay)
|
require.Equal(t, true, relay)
|
||||||
|
|
||||||
require.Len(t, attributes, 2)
|
require.Len(t, attributes, 2)
|
||||||
|
|
|
@ -205,7 +205,7 @@ type remoteLoadAnnounceProvider struct {
|
||||||
loadAddrSrc network.LocalAddressSource
|
loadAddrSrc network.LocalAddressSource
|
||||||
|
|
||||||
clientCache interface {
|
clientCache interface {
|
||||||
Get(*network.Address) (apiClient.Client, error)
|
Get(network.Address) (apiClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
deadEndProvider loadcontroller.WriterProvider
|
deadEndProvider loadcontroller.WriterProvider
|
||||||
|
@ -218,12 +218,14 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc
|
||||||
|
|
||||||
addr := srv.Address()
|
addr := srv.Address()
|
||||||
|
|
||||||
netAddr, err := network.AddressFromString(addr)
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(r.loadAddrSrc, *netAddr) {
|
if network.IsLocalAddress(r.loadAddrSrc, netAddr) {
|
||||||
// if local => return no-op writer
|
// if local => return no-op writer
|
||||||
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -408,7 +408,7 @@ type reputationClientConstructor struct {
|
||||||
trustStorage *truststorage.Storage
|
trustStorage *truststorage.Storage
|
||||||
|
|
||||||
basicConstructor interface {
|
basicConstructor interface {
|
||||||
Get(*network.Address) (client.Client, error)
|
Get(network.Address) (client.Client, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -492,7 +492,7 @@ func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchO
|
||||||
return ids, err
|
return ids, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client, error) {
|
func (c *reputationClientConstructor) Get(addr network.Address) (client.Client, error) {
|
||||||
cl, err := c.basicConstructor.Get(addr)
|
cl, err := c.basicConstructor.Get(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -501,9 +501,11 @@ func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client,
|
||||||
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for i := range nm.Nodes {
|
for i := range nm.Nodes {
|
||||||
netAddr, err := network.AddressFromString(nm.Nodes[i].Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(nm.Nodes[i].Address())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if netAddr.Equal(*addr) {
|
if netAddr.Equal(addr) {
|
||||||
prm := truststorage.UpdatePrm{}
|
prm := truststorage.UpdatePrm{}
|
||||||
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
|
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientCache interface {
|
type clientCache interface {
|
||||||
Get(*network.Address) (apiClient.Client, error)
|
Get(network.Address) (apiClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// clientKeyRemoteProvider must provide remote writer and take into account
|
// clientKeyRemoteProvider must provide remote writer and take into account
|
||||||
|
@ -72,12 +72,14 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
|
||||||
|
|
||||||
addr := srv.Address()
|
addr := srv.Address()
|
||||||
|
|
||||||
netAddr, err := network.AddressFromString(addr)
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(rtp.localAddrSrc, *netAddr) {
|
if network.IsLocalAddress(rtp.localAddrSrc, netAddr) {
|
||||||
// if local => return no-op writer
|
// if local => return no-op writer
|
||||||
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
|
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,9 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob
|
||||||
zap.Int("total_tries", ln),
|
zap.Int("total_tries", ln),
|
||||||
)
|
)
|
||||||
|
|
||||||
netAddr, err := network.AddressFromString(shuffled[i].Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(shuffled[i].Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ type (
|
||||||
|
|
||||||
// NeoFSClientCache is an interface for cache of neofs RPC clients
|
// NeoFSClientCache is an interface for cache of neofs RPC clients
|
||||||
NeoFSClientCache interface {
|
NeoFSClientCache interface {
|
||||||
Get(address *network.Address) (SDKClient.Client, error)
|
Get(address network.Address) (SDKClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
TaskManager interface {
|
TaskManager interface {
|
||||||
|
|
|
@ -22,7 +22,7 @@ type (
|
||||||
ClientCache struct {
|
ClientCache struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
cache interface {
|
cache interface {
|
||||||
Get(address *network.Address) (client.Client, error)
|
Get(address network.Address) (client.Client, error)
|
||||||
CloseAll()
|
CloseAll()
|
||||||
}
|
}
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
|
@ -49,7 +49,7 @@ func newClientCache(p *clientCacheParams) *ClientCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientCache) Get(address *network.Address) (client.Client, error) {
|
func (c *ClientCache) Get(address network.Address) (client.Client, error) {
|
||||||
// Because cache is used by `ClientCache` exclusively,
|
// Because cache is used by `ClientCache` exclusively,
|
||||||
// client will always have valid key.
|
// client will always have valid key.
|
||||||
return c.cache.Get(address)
|
return c.cache.Get(address)
|
||||||
|
@ -75,7 +75,9 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
|
||||||
getParams.WithAddress(addr)
|
getParams.WithAddress(addr)
|
||||||
|
|
||||||
for _, node := range placement.FlattenNodes(nodes) {
|
for _, node := range placement.FlattenNodes(nodes) {
|
||||||
netAddr, err := network.AddressFromString(node.Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(node.Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Warn("can't parse remote address",
|
c.log.Warn("can't parse remote address",
|
||||||
zap.String("address", node.Address()),
|
zap.String("address", node.Address()),
|
||||||
|
@ -137,7 +139,9 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
|
||||||
headParams.WithMainFields()
|
headParams.WithMainFields()
|
||||||
headParams.WithAddress(objAddress)
|
headParams.WithAddress(objAddress)
|
||||||
|
|
||||||
netAddr, err := network.AddressFromString(node.Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(node.Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
||||||
}
|
}
|
||||||
|
@ -173,7 +177,9 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje
|
||||||
rangeParams.WithRangeList(rng)
|
rangeParams.WithRangeList(rng)
|
||||||
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game
|
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game
|
||||||
|
|
||||||
netAddr, err := network.AddressFromString(node.Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(node.Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ type Address struct {
|
||||||
// LocalAddressSource is an interface of local
|
// LocalAddressSource is an interface of local
|
||||||
// network address container with read access.
|
// network address container with read access.
|
||||||
type LocalAddressSource interface {
|
type LocalAddressSource interface {
|
||||||
LocalAddress() *Address
|
LocalAddress() Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns multiaddr string.
|
// String returns multiaddr string.
|
||||||
|
@ -57,26 +57,21 @@ func (a Address) HostAddr() string {
|
||||||
return host
|
return host
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddressFromString restores address from a string representation.
|
// FromString restores Address from a string representation.
|
||||||
//
|
//
|
||||||
// Supports MultiAddr and HostAddr strings.
|
// Supports MultiAddr and HostAddr strings.
|
||||||
func AddressFromString(s string) (*Address, error) {
|
func (a *Address) FromString(s string) error {
|
||||||
ma, err := multiaddr.NewMultiaddr(s)
|
var err error
|
||||||
|
|
||||||
|
a.ma, err = multiaddr.NewMultiaddr(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s, err = multiaddrStringFromHostAddr(s)
|
s, err = multiaddrStringFromHostAddr(s)
|
||||||
if err != nil {
|
if err == nil {
|
||||||
return nil, err
|
a.ma, err = multiaddr.NewMultiaddr(s)
|
||||||
}
|
|
||||||
|
|
||||||
ma, err = multiaddr.NewMultiaddr(s) // don't want recursion there
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Address{
|
return err
|
||||||
ma: ma,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080"
|
// multiaddrStringFromHostAddr converts "localhost:8080" to "/dns4/localhost/tcp/8080"
|
||||||
|
|
|
@ -20,8 +20,10 @@ func TestAddressFromString(t *testing.T) {
|
||||||
{"[2004:eb1::1]:8080", buildMultiaddr("/ip6/2004:eb1::1/tcp/8080", t)},
|
{"[2004:eb1::1]:8080", buildMultiaddr("/ip6/2004:eb1::1/tcp/8080", t)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var addr Address
|
||||||
|
|
||||||
for _, testcase := range testcases {
|
for _, testcase := range testcases {
|
||||||
addr, err := AddressFromString(testcase.inp)
|
err := addr.FromString(testcase.inp)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, testcase.exp, addr.ma, testcase.inp)
|
require.Equal(t, testcase.exp, addr.ma, testcase.inp)
|
||||||
}
|
}
|
||||||
|
@ -68,14 +70,18 @@ func buildMultiaddr(s string, t *testing.T) multiaddr.Multiaddr {
|
||||||
func TestAddress_WriteToNodeInfo(t *testing.T) {
|
func TestAddress_WriteToNodeInfo(t *testing.T) {
|
||||||
a := "127.0.0.1:8080"
|
a := "127.0.0.1:8080"
|
||||||
|
|
||||||
addr, err := AddressFromString(a)
|
var addr Address
|
||||||
|
|
||||||
|
err := addr.FromString(a)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var ni netmap.NodeInfo
|
var ni netmap.NodeInfo
|
||||||
|
|
||||||
addr.WriteToNodeInfo(&ni)
|
addr.WriteToNodeInfo(&ni)
|
||||||
|
|
||||||
restored, err := AddressFromString(ni.Address())
|
var restored Address
|
||||||
|
|
||||||
|
err = restored.FromString(ni.Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, restored.Equal(*addr))
|
require.True(t, restored.Equal(addr))
|
||||||
}
|
}
|
||||||
|
|
2
pkg/network/cache/client.go
vendored
2
pkg/network/cache/client.go
vendored
|
@ -29,7 +29,7 @@ func NewSDKClientCache(opts ...client.Option) *ClientCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get function returns existing client or creates a new one.
|
// Get function returns existing client or creates a new one.
|
||||||
func (c *ClientCache) Get(netAddr *network.Address) (client.Client, error) {
|
func (c *ClientCache) Get(netAddr network.Address) (client.Client, error) {
|
||||||
// multiaddr is used as a key in client cache since
|
// multiaddr is used as a key in client cache since
|
||||||
// same host may have different connections(with tls or not),
|
// same host may have different connections(with tls or not),
|
||||||
// therefore, host+port pair is not unique
|
// therefore, host+port pair is not unique
|
||||||
|
|
|
@ -43,7 +43,9 @@ func TestAddress_AddTLS(t *testing.T) {
|
||||||
|
|
||||||
addr.AddTLS()
|
addr.AddTLS()
|
||||||
|
|
||||||
netAddr, err := AddressFromString(test.want)
|
var netAddr Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(test.want)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.True(t, netAddr.Equal(addr), test.input)
|
require.True(t, netAddr.Equal(addr), test.input)
|
||||||
|
|
|
@ -270,7 +270,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) {
|
func (exec execCtx) remoteClient(node network.Address) (getClient, bool) {
|
||||||
log := exec.log.With(zap.Stringer("node", node))
|
log := exec.log.With(zap.Stringer("node", node))
|
||||||
|
|
||||||
c, err := exec.svc.clientCache.get(node)
|
c, err := exec.svc.clientCache.get(node)
|
||||||
|
|
|
@ -82,7 +82,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(mAddr *network.Address) (getClient, error) {
|
func (c *testClientCache) get(mAddr network.Address) (getClient, error) {
|
||||||
v, ok := c.clients[mAddr.HostAddr()]
|
v, ok := c.clients[mAddr.HostAddr()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
|
@ -406,8 +406,9 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
|
||||||
strconv.Itoa(60000+j),
|
strconv.Itoa(60000+j),
|
||||||
)
|
)
|
||||||
|
|
||||||
var err error
|
var na network.Address
|
||||||
na, err := network.AddressFromString(a)
|
|
||||||
|
err := na.FromString(a)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
as[j] = na.HostAddr()
|
as[j] = na.HostAddr()
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) bool {
|
func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool {
|
||||||
log := exec.log.With(zap.Stringer("remote node", addr))
|
log := exec.log.With(zap.Stringer("remote node", addr))
|
||||||
|
|
||||||
log.Debug("processing node...")
|
log.Debug("processing node...")
|
||||||
|
|
|
@ -35,7 +35,7 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientCache interface {
|
clientCache interface {
|
||||||
get(*network.Address) (getClient, error)
|
get(network.Address) (getClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
|
@ -93,7 +93,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(*network.Address) (client.Client, error)
|
Get(network.Address) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||||
|
|
|
@ -72,7 +72,7 @@ func (s *SimpleObjectWriter) Object() *object.Object {
|
||||||
return s.obj.Object()
|
return s.obj.Object()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientCacheWrapper) get(addr *network.Address) (getClient, error) {
|
func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) {
|
||||||
clt, err := c.cache.Get(addr)
|
clt, err := c.cache.Get(addr)
|
||||||
|
|
||||||
return &clientWrapper{
|
return &clientWrapper{
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(*network.Address) (client.Client, error)
|
Get(network.Address) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeader represents utility for getting
|
// RemoteHeader represents utility for getting
|
||||||
|
@ -28,7 +28,7 @@ type RemoteHeader struct {
|
||||||
type RemoteHeadPrm struct {
|
type RemoteHeadPrm struct {
|
||||||
commonHeadPrm *Prm
|
commonHeadPrm *Prm
|
||||||
|
|
||||||
node *network.Address
|
node network.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
@ -42,7 +42,7 @@ func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *Remo
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeAddress sets network address of the remote node.
|
// WithNodeAddress sets network address of the remote node.
|
||||||
func (p *RemoteHeadPrm) WithNodeAddress(v *network.Address) *RemoteHeadPrm {
|
func (p *RemoteHeadPrm) WithNodeAddress(v network.Address) *RemoteHeadPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,9 @@ type distributedTarget struct {
|
||||||
|
|
||||||
chunks [][]byte
|
chunks [][]byte
|
||||||
|
|
||||||
nodeTargetInitializer func(*network.Address) transformer.ObjectTarget
|
nodeTargetInitializer func(network.Address) transformer.ObjectTarget
|
||||||
|
|
||||||
relay func(*network.Address) error
|
relay func(network.Address) error
|
||||||
|
|
||||||
fmt *object.FormatValidator
|
fmt *object.FormatValidator
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
return t.iteratePlacement(t.sendObject)
|
return t.iteratePlacement(t.sendObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) sendObject(addr *network.Address) error {
|
func (t *distributedTarget) sendObject(addr network.Address) error {
|
||||||
if t.relay != nil {
|
if t.relay != nil {
|
||||||
err := t.relay(addr)
|
err := t.relay(addr)
|
||||||
if err == nil || !errors.Is(err, errLocalAddress) {
|
if err == nil || !errors.Is(err, errLocalAddress) {
|
||||||
|
@ -86,7 +86,7 @@ func (t *distributedTarget) sendObject(addr *network.Address) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *distributedTarget) iteratePlacement(f func(*network.Address) error) (*transformer.AccessIdentifiers, error) {
|
func (t *distributedTarget) iteratePlacement(f func(network.Address) error) (*transformer.AccessIdentifiers, error) {
|
||||||
traverser, err := placement.NewTraverser(
|
traverser, err := placement.NewTraverser(
|
||||||
append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
|
append(t.traverseOpts, placement.ForObject(t.obj.ID()))...,
|
||||||
)
|
)
|
||||||
|
|
|
@ -20,7 +20,7 @@ type remoteTarget struct {
|
||||||
|
|
||||||
commonPrm *util.CommonPrm
|
commonPrm *util.CommonPrm
|
||||||
|
|
||||||
addr *network.Address
|
addr network.Address
|
||||||
|
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ type RemoteSender struct {
|
||||||
|
|
||||||
// RemotePutPrm groups remote put operation parameters.
|
// RemotePutPrm groups remote put operation parameters.
|
||||||
type RemotePutPrm struct {
|
type RemotePutPrm struct {
|
||||||
node *network.Address
|
node network.Address
|
||||||
|
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeAddress sets network address of the remote node.
|
// WithNodeAddress sets network address of the remote node.
|
||||||
func (p *RemotePutPrm) WithNodeAddress(v *network.Address) *RemotePutPrm {
|
func (p *RemotePutPrm) WithNodeAddress(v network.Address) *RemotePutPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(*network.Address) (client.Client, error)
|
Get(network.Address) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
|
|
@ -147,10 +147,10 @@ func (p *Streamer) preparePrm(prm *PutInitPrm) error {
|
||||||
var errLocalAddress = errors.New("can't relay to local address")
|
var errLocalAddress = errors.New("can't relay to local address")
|
||||||
|
|
||||||
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
var relay func(*network.Address) error
|
var relay func(network.Address) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(addr *network.Address) error {
|
relay = func(addr network.Address) error {
|
||||||
if network.IsLocalAddress(p.localAddrSrc, *addr) {
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
||||||
return errLocalAddress
|
return errLocalAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +166,8 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
return &distributedTarget{
|
return &distributedTarget{
|
||||||
traverseOpts: prm.traverseOpts,
|
traverseOpts: prm.traverseOpts,
|
||||||
workerPool: p.workerPool,
|
workerPool: p.workerPool,
|
||||||
nodeTargetInitializer: func(addr *network.Address) transformer.ObjectTarget {
|
nodeTargetInitializer: func(addr network.Address) transformer.ObjectTarget {
|
||||||
if network.IsLocalAddress(p.localAddrSrc, *addr) {
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
||||||
return &localTarget{
|
return &localTarget{
|
||||||
storage: p.localStore,
|
storage: p.localStore,
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,7 +117,7 @@ func (exec *execCtx) generateTraverser(cid *cid.ID) (*placement.Traverser, bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) {
|
func (exec execCtx) remoteClient(node network.Address) (searchClient, bool) {
|
||||||
log := exec.log.With(zap.Stringer("node", node))
|
log := exec.log.With(zap.Stringer("node", node))
|
||||||
|
|
||||||
c, err := exec.svc.clientConstructor.get(node)
|
c, err := exec.svc.clientConstructor.get(node)
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, addr *network.Address) {
|
func (exec *execCtx) processNode(ctx context.Context, addr network.Address) {
|
||||||
log := exec.log.With(zap.Stringer("remote node", addr))
|
log := exec.log.With(zap.Stringer("remote node", addr))
|
||||||
|
|
||||||
log.Debug("processing node...")
|
log.Debug("processing node...")
|
||||||
|
|
|
@ -84,7 +84,7 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(mAddr *network.Address) (searchClient, error) {
|
func (c *testClientCache) get(mAddr network.Address) (searchClient, error) {
|
||||||
v, ok := c.clients[mAddr.HostAddr()]
|
v, ok := c.clients[mAddr.HostAddr()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
|
@ -200,8 +200,9 @@ func testNodeMatrix(t testing.TB, dim []int) ([]netmap.Nodes, [][]string) {
|
||||||
strconv.Itoa(60000+j),
|
strconv.Itoa(60000+j),
|
||||||
)
|
)
|
||||||
|
|
||||||
var err error
|
var na network.Address
|
||||||
na, err := network.AddressFromString(a)
|
|
||||||
|
err := na.FromString(a)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
as[j] = na.HostAddr()
|
as[j] = na.HostAddr()
|
||||||
|
|
|
@ -27,7 +27,7 @@ type searchClient interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(*network.Address) (client.Client, error)
|
Get(network.Address) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -38,7 +38,7 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientConstructor interface {
|
clientConstructor interface {
|
||||||
get(*network.Address) (searchClient, error)
|
get(network.Address) (searchClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
|
|
|
@ -68,7 +68,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
|
||||||
return w.writer.WriteIDs(list)
|
return w.writer.WriteIDs(list)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientConstructorWrapper) get(addr *network.Address) (searchClient, error) {
|
func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, error) {
|
||||||
clt, err := c.constructor.Get(addr)
|
clt, err := c.constructor.Get(addr)
|
||||||
|
|
||||||
return &clientWrapper{
|
return &clientWrapper{
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// LogServiceError writes debug error message of object service to provided logger.
|
// LogServiceError writes debug error message of object service to provided logger.
|
||||||
func LogServiceError(l *logger.Logger, req string, node *network.Address, err error) {
|
func LogServiceError(l *logger.Logger, req string, node network.Address, err error) {
|
||||||
l.Debug("object service error",
|
l.Debug("object service error",
|
||||||
zap.Stringer("node", node),
|
zap.Stringer("node", node),
|
||||||
zap.String("request", req),
|
zap.String("request", req),
|
||||||
|
|
|
@ -50,13 +50,15 @@ func (p *localPlacement) BuildPlacement(addr *object.Address, policy *netmapSDK.
|
||||||
|
|
||||||
for i := range vs {
|
for i := range vs {
|
||||||
for j := range vs[i] {
|
for j := range vs[i] {
|
||||||
addr, err := network.AddressFromString(vs[i][j].Address())
|
var addr network.Address
|
||||||
|
|
||||||
|
err := addr.FromString(vs[i][j].Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(p.localAddrSrc, *addr) {
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
||||||
return []netmapSDK.Nodes{{vs[i][j]}}, nil
|
return []netmapSDK.Nodes{{vs[i][j]}}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,13 +84,15 @@ func (p *remotePlacement) BuildPlacement(addr *object.Address, policy *netmapSDK
|
||||||
|
|
||||||
for i := range vs {
|
for i := range vs {
|
||||||
for j := 0; j < len(vs[i]); j++ {
|
for j := 0; j < len(vs[i]); j++ {
|
||||||
addr, err := network.AddressFromString(vs[i][j].Address())
|
var addr network.Address
|
||||||
|
|
||||||
|
err := addr.FromString(vs[i][j].Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(p.localAddrSrc, *addr) {
|
if network.IsLocalAddress(p.localAddrSrc, addr) {
|
||||||
vs[i] = append(vs[i][:j], vs[i][j+1:]...)
|
vs[i] = append(vs[i][:j], vs[i][j+1:]...)
|
||||||
j--
|
j--
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ func flatNodes(ns []netmap.Nodes) []netmap.Nodes {
|
||||||
// Next returns next unprocessed address of the object placement.
|
// Next returns next unprocessed address of the object placement.
|
||||||
//
|
//
|
||||||
// Returns nil if no nodes left or traversal operation succeeded.
|
// Returns nil if no nodes left or traversal operation succeeded.
|
||||||
func (t *Traverser) Next() []*network.Address {
|
func (t *Traverser) Next() []network.Address {
|
||||||
t.mtx.Lock()
|
t.mtx.Lock()
|
||||||
defer t.mtx.Unlock()
|
defer t.mtx.Unlock()
|
||||||
|
|
||||||
|
@ -139,16 +139,14 @@ func (t *Traverser) Next() []*network.Address {
|
||||||
count = len(t.vectors[0])
|
count = len(t.vectors[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs := make([]*network.Address, 0, count)
|
addrs := make([]network.Address, count)
|
||||||
|
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
addr, err := network.AddressFromString(t.vectors[0][i].Address())
|
err := addrs[i].FromString(t.vectors[0][i].Address())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: log error
|
// TODO: log error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
addrs = append(addrs, addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
t.vectors[0] = t.vectors[0][count:]
|
t.vectors[0] = t.vectors[0][count:]
|
||||||
|
|
|
@ -65,10 +65,12 @@ func testPlacement(t *testing.T, ss, rs []int) ([]netmap.Nodes, *container.Conta
|
||||||
return nodes, container.New(container.WithPolicy(policy))
|
return nodes, container.New(container.WithPolicy(policy))
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertSameAddress(t *testing.T, ni *netmap.NodeInfo, addr *network.Address) {
|
func assertSameAddress(t *testing.T, ni *netmap.NodeInfo, addr network.Address) {
|
||||||
netAddr, err := network.AddressFromString(ni.Address())
|
var netAddr network.Address
|
||||||
|
|
||||||
|
err := netAddr.FromString(ni.Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, netAddr.Equal(*addr))
|
require.True(t, netAddr.Equal(addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTraverserObjectScenarios(t *testing.T) {
|
func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
|
@ -122,10 +124,12 @@ func TestTraverserObjectScenarios(t *testing.T) {
|
||||||
require.NotNil(t, tr.Next())
|
require.NotNil(t, tr.Next())
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := network.AddressFromString(nodes[1][0].Address())
|
var n network.Address
|
||||||
|
|
||||||
|
err = n.FromString(nodes[1][0].Address())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, []*network.Address{n}, tr.Next())
|
require.Equal(t, []network.Address{n}, tr.Next())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("put scenario", func(t *testing.T) {
|
t.Run("put scenario", func(t *testing.T) {
|
||||||
|
|
|
@ -61,14 +61,16 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
|
||||||
|
|
||||||
log := p.log.With(zap.String("node", netAddr))
|
log := p.log.With(zap.String("node", netAddr))
|
||||||
|
|
||||||
node, err := network.AddressFromString(netAddr)
|
var node network.Address
|
||||||
|
|
||||||
|
err := node.FromString(netAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not parse network address")
|
log.Error("could not parse network address")
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if network.IsLocalAddress(p.localAddrSrc, *node) {
|
if network.IsLocalAddress(p.localAddrSrc, node) {
|
||||||
if shortage == 0 {
|
if shortage == 0 {
|
||||||
// we can call the redundant copy callback
|
// we can call the redundant copy callback
|
||||||
// here to slightly improve the performance
|
// here to slightly improve the performance
|
||||||
|
|
|
@ -70,7 +70,9 @@ func (p *Replicator) handleTask(ctx context.Context, task *Task) {
|
||||||
|
|
||||||
log := p.log.With(zap.String("node", netAddr))
|
log := p.log.With(zap.String("node", netAddr))
|
||||||
|
|
||||||
node, err := network.AddressFromString(netAddr)
|
var node network.Address
|
||||||
|
|
||||||
|
err := node.FromString(netAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not parse network address")
|
log.Error("could not parse network address")
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue