From 33bef46f31b4c12874a7e3d6e5eaf92c2306ef93 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 20 May 2021 18:17:16 +0300 Subject: [PATCH] [#549] network/cache: Change `Get` signature Make network cache's `Get` method accept `network.Address` argument instead of string. Signed-off-by: Pavel Karpy --- cmd/neofs-node/container.go | 6 +++--- cmd/neofs-node/object.go | 8 ++++---- cmd/neofs-node/reputation/common/remote.go | 6 +++--- pkg/innerring/processors/audit/process.go | 4 ++-- pkg/innerring/processors/audit/processor.go | 3 ++- pkg/innerring/rpc.go | 22 ++++++++++----------- pkg/network/address.go | 5 +++++ pkg/network/cache/client.go | 19 +++++++++++++----- pkg/services/object/get/exec.go | 18 ++++------------- pkg/services/object/get/get_test.go | 9 +++++++-- pkg/services/object/get/service.go | 5 +++-- pkg/services/object/get/util.go | 3 ++- pkg/services/object/head/remote.go | 13 ++++-------- pkg/services/object/put/remote.go | 11 +++-------- pkg/services/object/put/service.go | 2 +- pkg/services/object/search/exec.go | 17 +++------------- pkg/services/object/search/search_test.go | 9 +++++++-- pkg/services/object/search/service.go | 5 +++-- pkg/services/object/search/util.go | 3 ++- 19 files changed, 83 insertions(+), 85 deletions(-) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 23384a2cb..05ddc0b2e 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -201,7 +201,7 @@ type remoteLoadAnnounceProvider struct { loadAddrSrc network.LocalAddressSource clientCache interface { - Get(string) (apiClient.Client, error) + Get(*network.Address) (apiClient.Client, error) } deadEndProvider loadcontroller.WriterProvider @@ -219,12 +219,12 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil } - hostAddr, err := network.HostAddrFromMultiaddr(addr) + netAddr, err := network.AddressFromString(addr) if err != nil { return nil, fmt.Errorf("could not convert address to IP format: %w", err) } - c, err := r.clientCache.Get(hostAddr) + c, err := r.clientCache.Get(netAddr) if err != nil { return nil, fmt.Errorf("could not initialize API client: %w", err) } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index d10c3bdbf..0ed2b339c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -401,7 +401,7 @@ type reputationClientConstructor struct { trustStorage *truststorage.Storage basicConstructor interface { - Get(string) (client.Client, error) + Get(*network.Address) (client.Client, error) } } @@ -485,7 +485,7 @@ func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchO return ids, err } -func (c *reputationClientConstructor) Get(addr string) (client.Client, error) { +func (c *reputationClientConstructor) Get(addr *network.Address) (client.Client, error) { cl, err := c.basicConstructor.Get(addr) if err != nil { return nil, err @@ -494,9 +494,9 @@ func (c *reputationClientConstructor) Get(addr string) (client.Client, error) { nm, err := netmap.GetLatestNetworkMap(c.nmSrc) if err == nil { for i := range nm.Nodes { - hostAddr, err := network.HostAddrFromMultiaddr(nm.Nodes[i].Address()) + netAddr, err := network.AddressFromString(nm.Nodes[i].Address()) if err == nil { - if hostAddr == addr { + if netAddr.Equal(addr) { prm := truststorage.UpdatePrm{} prm.SetPeer(reputation.PeerIDFromBytes(nm.Nodes[i].PublicKey())) diff --git a/cmd/neofs-node/reputation/common/remote.go b/cmd/neofs-node/reputation/common/remote.go index 9ffb69535..bb4d88e63 100644 --- a/cmd/neofs-node/reputation/common/remote.go +++ b/cmd/neofs-node/reputation/common/remote.go @@ -11,7 +11,7 @@ import ( ) type clientCache interface { - Get(string) (apiClient.Client, error) + Get(*network.Address) (apiClient.Client, error) } // clientKeyRemoteProvider must provide remote writer and take into account @@ -77,12 +77,12 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil } - hostAddr, err := network.HostAddrFromMultiaddr(addr) + netAddr, err := network.AddressFromString(addr) if err != nil { return nil, fmt.Errorf("could not convert address to IP format: %w", err) } - c, err := rtp.clientCache.Get(hostAddr) + c, err := rtp.clientCache.Get(netAddr) if err != nil { return nil, fmt.Errorf("could not initialize API client: %w", err) } diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 7932594a5..d3b1a597c 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -118,14 +118,14 @@ func (ap *Processor) findStorageGroups(cid *container.ID, shuffled netmap.Nodes) zap.Int("total_tries", ln), ) - addr, err := network.HostAddrFromMultiaddr(shuffled[i].Address()) + netAddr, err := network.AddressFromString(shuffled[i].Address()) if err != nil { log.Warn("can't parse remote address", zap.String("error", err.Error())) continue } - cli, err := ap.clientCache.Get(addr) + cli, err := ap.clientCache.Get(netAddr) if err != nil { log.Warn("can't setup remote connection", zap.String("error", err.Error())) diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 06b92e239..a5d41b86e 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -14,6 +14,7 @@ import ( wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -28,7 +29,7 @@ type ( // NeoFSClientCache is an interface for cache of neofs RPC clients NeoFSClientCache interface { - Get(address string) (SDKClient.Client, error) + Get(address *network.Address) (SDKClient.Client, error) } TaskManager interface { diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 5f0b98370..45130612a 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -22,7 +22,7 @@ type ( ClientCache struct { log *zap.Logger cache interface { - Get(string) (client.Client, error) + Get(address *network.Address) (client.Client, error) } key *ecdsa.PrivateKey @@ -48,7 +48,7 @@ func newClientCache(p *clientCacheParams) *ClientCache { } } -func (c *ClientCache) Get(address string) (client.Client, error) { +func (c *ClientCache) Get(address *network.Address) (client.Client, error) { // Because cache is used by `ClientCache` exclusively, // client will always have valid key. return c.cache.Get(address) @@ -74,7 +74,7 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma getParams.WithAddress(addr) for _, node := range placement.FlattenNodes(nodes) { - addr, err := network.HostAddrFromMultiaddr(node.Address()) + netAddr, err := network.AddressFromString(node.Address()) if err != nil { c.log.Warn("can't parse remote address", zap.String("address", node.Address()), @@ -83,10 +83,10 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma continue } - cli, err := c.Get(addr) + cli, err := c.Get(netAddr) if err != nil { c.log.Warn("can't setup remote connection", - zap.String("address", addr), + zap.String("address", netAddr.String()), zap.String("error", err.Error())) continue @@ -136,14 +136,14 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. headParams.WithMainFields() headParams.WithAddress(objAddress) - addr, err := network.HostAddrFromMultiaddr(node.Address()) + netAddr, err := network.AddressFromString(node.Address()) if err != nil { return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err) } - cli, err := c.Get(addr) + cli, err := c.Get(netAddr) if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err) + return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) @@ -172,14 +172,14 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje rangeParams.WithRangeList(rng) rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game - addr, err := network.HostAddrFromMultiaddr(node.Address()) + netAddr, err := network.AddressFromString(node.Address()) if err != nil { return nil, fmt.Errorf("can't parse remote address %s: %w", node.Address(), err) } - cli, err := c.Get(addr) + cli, err := c.Get(netAddr) if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", addr, err) + return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) diff --git a/pkg/network/address.go b/pkg/network/address.go index bf47913f6..861340b01 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -36,6 +36,11 @@ func (a Address) String() string { return a.ma.String() } +// Equal compares Address's. +func (a Address) Equal(addr *Address) bool { + return a.ma.Equal(addr.ma) +} + // IPAddrString returns network endpoint address in string format. func (a Address) IPAddrString() (string, error) { ip, err := manet.ToNetAddr(a.ma) diff --git a/pkg/network/cache/client.go b/pkg/network/cache/client.go index 5a38a59f8..45fe702b6 100644 --- a/pkg/network/cache/client.go +++ b/pkg/network/cache/client.go @@ -1,9 +1,11 @@ package cache import ( + "fmt" "sync" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/network" ) type ( @@ -27,9 +29,14 @@ func NewSDKClientCache(opts ...client.Option) *ClientCache { } // Get function returns existing client or creates a new one. -func (c *ClientCache) Get(address string) (client.Client, error) { +func (c *ClientCache) Get(netAddr *network.Address) (client.Client, error) { + hostAddr, err := netAddr.HostAddrString() + if err != nil { + return nil, fmt.Errorf("could not parse address as a string: %w", err) + } + c.mu.RLock() - if cli, ok := c.clients[address]; ok { + if cli, ok := c.clients[hostAddr]; ok { // todo: check underlying connection neofs-api-go#196 c.mu.RUnlock() @@ -43,16 +50,18 @@ func (c *ClientCache) Get(address string) (client.Client, error) { // check once again if client is missing in cache, concurrent routine could // create client while this routine was locked on `c.mu.Lock()`. - if cli, ok := c.clients[address]; ok { + if cli, ok := c.clients[hostAddr]; ok { return cli, nil } - cli, err := client.New(append(c.opts, client.WithAddress(address))...) + opts := append(c.opts, client.WithAddress(hostAddr)) + + cli, err := client.New(opts...) if err != nil { return nil, err } - c.clients[address] = cli + c.clients[hostAddr] = cli return cli, nil } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 4752e60b7..49b0fc62b 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -271,28 +271,18 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { } func (exec execCtx) remoteClient(node *network.Address) (getClient, bool) { - hostAddr, err := node.HostAddrString() - log := exec.log.With(zap.Stringer("node", node)) + c, err := exec.svc.clientCache.get(node) + switch { default: exec.status = statusUndefined exec.err = err - log.Debug("could not calculate node IP address") + log.Debug("could not construct remote node client") case err == nil: - c, err := exec.svc.clientCache.get(hostAddr) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - log.Debug("could not construct remote node client") - case err == nil: - return c, true - } + return c, true } return nil, false diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index bbae71d28..80416ae84 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -80,8 +80,13 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return vs, nil } -func (c *testClientCache) get(addr string) (getClient, error) { - v, ok := c.clients[addr] +func (c *testClientCache) get(mAddr *network.Address) (getClient, error) { + hostAddr, err := mAddr.HostAddrString() + if err != nil { + return nil, err + } + + v, ok := c.clients[hostAddr] if !ok { return nil, errors.New("could not construct client") } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 386747aaa..1244c0063 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -34,7 +35,7 @@ type cfg struct { } clientCache interface { - get(string) (getClient, error) + get(*network.Address) (getClient, error) } traverserGenerator interface { @@ -92,7 +93,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option { } type ClientConstructor interface { - Get(string) (client.Client, error) + Get(*network.Address) (client.Client, error) } // WithClientConstructor returns option to set constructor of remote node clients. diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index ba44e595b..a8dee7f78 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network" ) type SimpleObjectWriter struct { @@ -71,7 +72,7 @@ func (s *SimpleObjectWriter) Object() *object.Object { return s.obj.Object() } -func (c *clientCacheWrapper) get(addr string) (getClient, error) { +func (c *clientCacheWrapper) get(addr *network.Address) (getClient, error) { clt, err := c.cache.Get(addr) return &clientWrapper{ diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index 5e4e1c217..088e4f111 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -13,7 +13,7 @@ import ( ) type ClientConstructor interface { - Get(string) (client.Client, error) + Get(*network.Address) (client.Client, error) } // RemoteHeader represents utility for getting @@ -66,14 +66,9 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) } - addr, err := prm.node.HostAddrString() + c, err := h.clientCache.Get(prm.node) if err != nil { - return nil, err - } - - c, err := h.clientCache.Get(addr) - if err != nil { - return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, addr, err) + return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err) } p := new(client.ObjectHeaderParams). @@ -91,7 +86,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob client.WithKey(key), ) if err != nil { - return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, addr, err) + return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, prm.node, err) } return object.NewFromSDK(hdr), nil diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index e9d3ebde2..2d1a710f2 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -54,14 +54,9 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err) } - addr, err := t.addr.HostAddrString() + c, err := t.clientConstructor.Get(t.addr) if err != nil { - return nil, err - } - - c, err := t.clientConstructor.Get(addr) - if err != nil { - return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, addr, err) + return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err) } id, err := c.PutObject(t.ctx, new(client.PutObjectParams). @@ -75,7 +70,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { )..., ) if err != nil { - return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, addr, err) + return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.addr, err) } return new(transformer.AccessIdentifiers). diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index b4ddd657e..a34d10d0c 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -30,7 +30,7 @@ type Service struct { type Option func(*cfg) type ClientConstructor interface { - Get(string) (client.Client, error) + Get(*network.Address) (client.Client, error) } type cfg struct { diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 7a4e25a24..cbf9d6ebc 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -118,28 +118,17 @@ func (exec *execCtx) generateTraverser(cid *container.ID) (*placement.Traverser, } func (exec execCtx) remoteClient(node *network.Address) (searchClient, bool) { - hostAddr, err := node.HostAddrString() - log := exec.log.With(zap.Stringer("node", node)) + c, err := exec.svc.clientConstructor.get(node) switch { default: exec.status = statusUndefined exec.err = err - log.Debug("could not calculate node IP address") + log.Debug("could not construct remote node client") case err == nil: - c, err := exec.svc.clientConstructor.get(hostAddr) - - switch { - default: - exec.status = statusUndefined - exec.err = err - - log.Debug("could not construct remote node client") - case err == nil: - return c, true - } + return c, true } return nil, false diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 3438b6627..579449528 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -82,8 +82,13 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap return res, nil } -func (c *testClientCache) get(addr string) (searchClient, error) { - v, ok := c.clients[addr] +func (c *testClientCache) get(mAddr *network.Address) (searchClient, error) { + hostAddr, err := mAddr.HostAddrString() + if err != nil { + return nil, err + } + + v, ok := c.clients[hostAddr] if !ok { return nil, errors.New("could not construct client") } diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index b0097b348..4ab835f8f 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" @@ -26,7 +27,7 @@ type searchClient interface { } type ClientConstructor interface { - Get(string) (client.Client, error) + Get(*network.Address) (client.Client, error) } type cfg struct { @@ -37,7 +38,7 @@ type cfg struct { } clientConstructor interface { - get(string) (searchClient, error) + get(*network.Address) (searchClient, error) } traverserGenerator interface { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 5a8295645..b468ba912 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -8,6 +8,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" ) @@ -67,7 +68,7 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error { return w.writer.WriteIDs(list) } -func (c *clientConstructorWrapper) get(addr string) (searchClient, error) { +func (c *clientConstructorWrapper) get(addr *network.Address) (searchClient, error) { clt, err := c.constructor.Get(addr) return &clientWrapper{