forked from TrueCloudLab/frostfs-node
[#549] network/cache: Change Get
signature
Make network cache's `Get` method accept `network.Address` argument instead of string. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
e10981a7d3
commit
33bef46f31
19 changed files with 83 additions and 85 deletions
|
@ -201,7 +201,7 @@ type remoteLoadAnnounceProvider struct {
|
|||
loadAddrSrc network.LocalAddressSource
|
||||
|
||||
clientCache interface {
|
||||
Get(string) (apiClient.Client, error)
|
||||
Get(*network.Address) (apiClient.Client, error)
|
||||
}
|
||||
|
||||
deadEndProvider loadcontroller.WriterProvider
|
||||
|
@ -219,12 +219,12 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc
|
|||
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
||||
}
|
||||
|
||||
hostAddr, err := network.HostAddrFromMultiaddr(addr)
|
||||
netAddr, err := network.AddressFromString(addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||
}
|
||||
|
||||
c, err := r.clientCache.Get(hostAddr)
|
||||
c, err := r.clientCache.Get(netAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||
}
|
||||
|
|
|
@ -401,7 +401,7 @@ type reputationClientConstructor struct {
|
|||
trustStorage *truststorage.Storage
|
||||
|
||||
basicConstructor interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(*network.Address) (client.Client, error)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -485,7 +485,7 @@ func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchO
|
|||
return ids, err
|
||||
}
|
||||
|
||||
func (c *reputationClientConstructor) Get(addr string) (client.Client, error) {
|
||||
func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client, error) {
|
||||
cl, err := c.basicConstructor.Get(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -494,9 +494,9 @@ func (c *reputationClientConstructor) Get(addr string) (client.Client, error) {
|
|||
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
||||
if err == nil {
|
||||
for i := range nm.Nodes {
|
||||
hostAddr, err := network.HostAddrFromMultiaddr(nm.Nodes[i].Address())
|
||||
netAddr, err := network.AddressFromString(nm.Nodes[i].Address())
|
||||
if err == nil {
|
||||
if hostAddr == addr {
|
||||
if netAddr.Equal(addr) {
|
||||
prm := truststorage.UpdatePrm{}
|
||||
prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey()))
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
type clientCache interface {
|
||||
Get(string) (apiClient.Client, error)
|
||||
Get(*network.Address) (apiClient.Client, error)
|
||||
}
|
||||
|
||||
// clientKeyRemoteProvider must provide remote writer and take into account
|
||||
|
@ -77,12 +77,12 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
|
|||
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
|
||||
}
|
||||
|
||||
hostAddr, err := network.HostAddrFromMultiaddr(addr)
|
||||
netAddr, err := network.AddressFromString(addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||
}
|
||||
|
||||
c, err := rtp.clientCache.Get(hostAddr)
|
||||
c, err := rtp.clientCache.Get(netAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||
}
|
||||
|
|
|
@ -118,14 +118,14 @@ func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes)
|
|||
zap.Int("total_tries", ln),
|
||||
)
|
||||
|
||||
addr, err := network.HostAddrFromMultiaddr(shuffled[i].Address())
|
||||
netAddr, err := network.AddressFromString(shuffled[i].Address())
|
||||
if err != nil {
|
||||
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
cli, err := ap.clientCache.Get(addr)
|
||||
cli, err := ap.clientCache.Get(netAddr)
|
||||
if err != nil {
|
||||
log.Warn("can't setup remote connection", zap.String("error", err.Error()))
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"go.uber.org/zap"
|
||||
|
@ -28,7 +29,7 @@ type (
|
|||
|
||||
// NeoFSClientCache is an interface for cache of neofs RPC clients
|
||||
NeoFSClientCache interface {
|
||||
Get(address string) (SDKClient.Client, error)
|
||||
Get(address *network.Address) (SDKClient.Client, error)
|
||||
}
|
||||
|
||||
TaskManager interface {
|
||||
|
|
|
@ -22,7 +22,7 @@ type (
|
|||
ClientCache struct {
|
||||
log *zap.Logger
|
||||
cache interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(address *network.Address) (client.Client, error)
|
||||
}
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
|
@ -48,7 +48,7 @@ func newClientCache(p *clientCacheParams) *ClientCache {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *ClientCache) Get(address string) (client.Client, error) {
|
||||
func (c *ClientCache) Get(address *network.Address) (client.Client, error) {
|
||||
// Because cache is used by `ClientCache` exclusively,
|
||||
// client will always have valid key.
|
||||
return c.cache.Get(address)
|
||||
|
@ -74,7 +74,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
|
|||
getParams.WithAddress(addr)
|
||||
|
||||
for _, node := range placement.FlattenNodes(nodes) {
|
||||
addr, err := network.HostAddrFromMultiaddr(node.Address())
|
||||
netAddr, err := network.AddressFromString(node.Address())
|
||||
if err != nil {
|
||||
c.log.Warn("can't parse remote address",
|
||||
zap.String("address", node.Address()),
|
||||
|
@ -83,10 +83,10 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
|
|||
continue
|
||||
}
|
||||
|
||||
cli, err := c.Get(addr)
|
||||
cli, err := c.Get(netAddr)
|
||||
if err != nil {
|
||||
c.log.Warn("can't setup remote connection",
|
||||
zap.String("address", addr),
|
||||
zap.String("address", netAddr.String()),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
continue
|
||||
|
@ -136,14 +136,14 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
|
|||
headParams.WithMainFields()
|
||||
headParams.WithAddress(objAddress)
|
||||
|
||||
addr, err := network.HostAddrFromMultiaddr(node.Address())
|
||||
netAddr, err := network.AddressFromString(node.Address())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
||||
}
|
||||
|
||||
cli, err := c.Get(addr)
|
||||
cli, err := c.Get(netAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err)
|
||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
||||
|
@ -172,14 +172,14 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje
|
|||
rangeParams.WithRangeList(rng)
|
||||
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game
|
||||
|
||||
addr, err := network.HostAddrFromMultiaddr(node.Address())
|
||||
netAddr, err := network.AddressFromString(node.Address())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err)
|
||||
}
|
||||
|
||||
cli, err := c.Get(addr)
|
||||
cli, err := c.Get(netAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err)
|
||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout)
|
||||
|
|
|
@ -36,6 +36,11 @@ func (a Address) String() string {
|
|||
return a.ma.String()
|
||||
}
|
||||
|
||||
// Equal compares Address's.
|
||||
func (a Address) Equal(addr *Address) bool {
|
||||
return a.ma.Equal(addr.ma)
|
||||
}
|
||||
|
||||
// IPAddrString returns network endpoint address in string format.
|
||||
func (a Address) IPAddrString() (string, error) {
|
||||
ip, err := manet.ToNetAddr(a.ma)
|
||||
|
|
19
pkg/network/cache/client.go
vendored
19
pkg/network/cache/client.go
vendored
|
@ -1,9 +1,11 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -27,9 +29,14 @@ func NewSDKClientCache(opts ...client.Option) *ClientCache {
|
|||
}
|
||||
|
||||
// Get function returns existing client or creates a new one.
|
||||
func (c *ClientCache) Get(address string) (client.Client, error) {
|
||||
func (c *ClientCache) Get(netAddr *network.Address) (client.Client, error) {
|
||||
hostAddr, err := netAddr.HostAddrString()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse address as a string: %w", err)
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
if cli, ok := c.clients[address]; ok {
|
||||
if cli, ok := c.clients[hostAddr]; ok {
|
||||
// todo: check underlying connection neofs-api-go#196
|
||||
c.mu.RUnlock()
|
||||
|
||||
|
@ -43,16 +50,18 @@ func (c *ClientCache) Get(address string) (client.Client, error) {
|
|||
|
||||
// check once again if client is missing in cache, concurrent routine could
|
||||
// create client while this routine was locked on `c.mu.Lock()`.
|
||||
if cli, ok := c.clients[address]; ok {
|
||||
if cli, ok := c.clients[hostAddr]; ok {
|
||||
return cli, nil
|
||||
}
|
||||
|
||||
cli, err := client.New(append(c.opts, client.WithAddress(address))...)
|
||||
opts := append(c.opts, client.WithAddress(hostAddr))
|
||||
|
||||
cli, err := client.New(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.clients[address] = cli
|
||||
c.clients[hostAddr] = cli
|
||||
|
||||
return cli, nil
|
||||
}
|
||||
|
|
|
@ -271,18 +271,9 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
|
|||
}
|
||||
|
||||
func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) {
|
||||
hostAddr, err := node.HostAddrString()
|
||||
|
||||
log := exec.log.With(zap.Stringer("node", node))
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
log.Debug("could not calculate node IP address")
|
||||
case err == nil:
|
||||
c, err := exec.svc.clientCache.get(hostAddr)
|
||||
c, err := exec.svc.clientCache.get(node)
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -293,7 +284,6 @@ func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) {
|
|||
case err == nil:
|
||||
return c, true
|
||||
}
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
|
|
@ -80,8 +80,13 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
|||
return vs, nil
|
||||
}
|
||||
|
||||
func (c *testClientCache) get(addr string) (getClient, error) {
|
||||
v, ok := c.clients[addr]
|
||||
func (c *testClientCache) get(mAddr *network.Address) (getClient, error) {
|
||||
hostAddr, err := mAddr.HostAddrString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v, ok := c.clients[hostAddr]
|
||||
if !ok {
|
||||
return nil, errors.New("could not construct client")
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -34,7 +35,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
clientCache interface {
|
||||
get(string) (getClient, error)
|
||||
get(*network.Address) (getClient, error)
|
||||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
|
@ -92,7 +93,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
|||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(*network.Address) (client.Client, error)
|
||||
}
|
||||
|
||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
)
|
||||
|
||||
type SimpleObjectWriter struct {
|
||||
|
@ -71,7 +72,7 @@ func (s *SimpleObjectWriter) Object() *object.Object {
|
|||
return s.obj.Object()
|
||||
}
|
||||
|
||||
func (c *clientCacheWrapper) get(addr string) (getClient, error) {
|
||||
func (c *clientCacheWrapper) get(addr *network.Address) (getClient, error) {
|
||||
clt, err := c.cache.Get(addr)
|
||||
|
||||
return &clientWrapper{
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(*network.Address) (client.Client, error)
|
||||
}
|
||||
|
||||
// RemoteHeader represents utility for getting
|
||||
|
@ -66,14 +66,9 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
|||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||
}
|
||||
|
||||
addr, err := prm.node.HostAddrString()
|
||||
c, err := h.clientCache.Get(prm.node)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := h.clientCache.Get(addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, addr, err)
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err)
|
||||
}
|
||||
|
||||
p := new(client.ObjectHeaderParams).
|
||||
|
@ -91,7 +86,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
|||
client.WithKey(key),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, addr, err)
|
||||
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, prm.node, err)
|
||||
}
|
||||
|
||||
return object.NewFromSDK(hdr), nil
|
||||
|
|
|
@ -54,14 +54,9 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
||||
}
|
||||
|
||||
addr, err := t.addr.HostAddrString()
|
||||
c, err := t.clientConstructor.Get(t.addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c, err := t.clientConstructor.Get(addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, addr, err)
|
||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err)
|
||||
}
|
||||
|
||||
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
|
||||
|
@ -75,7 +70,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
|||
)...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, addr, err)
|
||||
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.addr, err)
|
||||
}
|
||||
|
||||
return new(transformer.AccessIdentifiers).
|
||||
|
|
|
@ -30,7 +30,7 @@ type Service struct {
|
|||
type Option func(*cfg)
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(*network.Address) (client.Client, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
|
|
@ -118,19 +118,9 @@ func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser,
|
|||
}
|
||||
|
||||
func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) {
|
||||
hostAddr, err := node.HostAddrString()
|
||||
|
||||
log := exec.log.With(zap.Stringer("node", node))
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
log.Debug("could not calculate node IP address")
|
||||
case err == nil:
|
||||
c, err := exec.svc.clientConstructor.get(hostAddr)
|
||||
|
||||
c, err := exec.svc.clientConstructor.get(node)
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
|
@ -140,7 +130,6 @@ func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) {
|
|||
case err == nil:
|
||||
return c, true
|
||||
}
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
|
|
@ -82,8 +82,13 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (c *testClientCache) get(addr string) (searchClient, error) {
|
||||
v, ok := c.clients[addr]
|
||||
func (c *testClientCache) get(mAddr *network.Address) (searchClient, error) {
|
||||
hostAddr, err := mAddr.HostAddrString()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v, ok := c.clients[hostAddr]
|
||||
if !ok {
|
||||
return nil, errors.New("could not construct client")
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
|
@ -26,7 +27,7 @@ type searchClient interface {
|
|||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(string) (client.Client, error)
|
||||
Get(*network.Address) (client.Client, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
@ -37,7 +38,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
clientConstructor interface {
|
||||
get(string) (searchClient, error)
|
||||
get(*network.Address) (searchClient, error)
|
||||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||
)
|
||||
|
@ -67,7 +68,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
|
|||
return w.writer.WriteIDs(list)
|
||||
}
|
||||
|
||||
func (c *clientConstructorWrapper) get(addr string) (searchClient, error) {
|
||||
func (c *clientConstructorWrapper) get(addr *network.Address) (searchClient, error) {
|
||||
clt, err := c.constructor.Get(addr)
|
||||
|
||||
return &clientWrapper{
|
||||
|
|
Loading…
Reference in a new issue