From e473f3ac9117f097d3845c6bc17afe0531e94f14 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 28 Sep 2021 08:32:30 +0300 Subject: [PATCH] [#645] *: Use helper functions to build client.NodeInfo structures Helper functions from core/client package allow to set public keys of storage nodes. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/container.go | 13 ++----- cmd/neofs-node/reputation/common/remote.go | 13 ++----- pkg/innerring/processors/audit/process.go | 12 +++--- pkg/innerring/processors/audit/processor.go | 4 +- pkg/innerring/rpc.go | 42 ++++++++------------- pkg/services/object/get/container.go | 2 +- pkg/services/object/head/remote.go | 17 +++++---- pkg/services/object/put/remote.go | 26 ++++++------- pkg/services/object/put/streamer.go | 15 ++++---- pkg/services/object/search/container.go | 2 +- pkg/services/policer/check.go | 14 +------ pkg/services/replicator/process.go | 14 +------ 12 files changed, 67 insertions(+), 107 deletions(-) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 0114af53..b57b983e 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -23,7 +23,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" - "github.com/nspcc-dev/neofs-node/pkg/network" containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc" containerService "github.com/nspcc-dev/neofs-node/pkg/services/container" loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" @@ -261,16 +260,12 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil } - var netAddr network.AddressGroup - - err := netAddr.FromIterator(srv) - if err != nil { - return nil, fmt.Errorf("could not convert address to IP format: %w", err) - } - var info client.NodeInfo - info.SetAddressGroup(netAddr) + err := client.NodeInfoFromRawNetmapElement(&info, srv) + if err != nil { + return nil, fmt.Errorf("parse client node info: %w", err) + } c, err := r.clientCache.Get(info) if err != nil { diff --git a/cmd/neofs-node/reputation/common/remote.go b/cmd/neofs-node/reputation/common/remote.go index b1ddd49a..1fa8e09d 100644 --- a/cmd/neofs-node/reputation/common/remote.go +++ b/cmd/neofs-node/reputation/common/remote.go @@ -6,7 +6,6 @@ import ( apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" - "github.com/nspcc-dev/neofs-node/pkg/network" reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common" reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router" trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller" @@ -77,16 +76,12 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil } - var netAddr network.AddressGroup - - err := netAddr.FromIterator(srv) - if err != nil { - return nil, fmt.Errorf("could not convert address to IP format: %w", err) - } - var info client.NodeInfo - info.SetAddressGroup(netAddr) + err := client.NodeInfoFromRawNetmapElement(&info, srv) + if err != nil { + return nil, fmt.Errorf("parse client node info: %w", err) + } c, err := rtp.clientCache.Get(info) if err != nil { diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 933b86a0..90392ee3 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -8,8 +8,8 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" + clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup" "github.com/nspcc-dev/neofs-node/pkg/util/rand" @@ -117,6 +117,8 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob ln := len(shuffled) + var info clientcore.NodeInfo + for i := range shuffled { // consider iterating over some part of container log := ap.log.With( zap.Stringer("cid", cid), @@ -125,16 +127,14 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob zap.Int("total_tries", ln), ) - var netAddr network.AddressGroup - - err := netAddr.FromIterator(shuffled[i]) + err := clientcore.NodeInfoFromRawNetmapElement(&info, shuffled[i]) if err != nil { - log.Warn("can't parse remote address", zap.String("error", err.Error())) + log.Warn("parse client node info", zap.String("error", err.Error())) continue } - cli, err := ap.clientCache.Get(netAddr) + cli, err := ap.clientCache.Get(info) if err != nil { log.Warn("can't setup remote connection", zap.String("error", err.Error())) diff --git a/pkg/innerring/processors/audit/processor.go b/pkg/innerring/processors/audit/processor.go index 6bc5d1cd..4163aea2 100644 --- a/pkg/innerring/processors/audit/processor.go +++ b/pkg/innerring/processors/audit/processor.go @@ -8,10 +8,10 @@ import ( "time" SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/client" wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -26,7 +26,7 @@ type ( // NeoFSClientCache is an interface for cache of neofs RPC clients NeoFSClientCache interface { - Get(network.AddressGroup) (SDKClient.Client, error) + Get(client.NodeInfo) (SDKClient.Client, error) } TaskManager interface { diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 68f3742e..d51f4844 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -3,7 +3,6 @@ package innerring import ( "context" "crypto/ecdsa" - "encoding/hex" "fmt" "time" @@ -13,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/storagegroup" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/audit" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" @@ -48,11 +46,7 @@ func newClientCache(p *clientCacheParams) *ClientCache { } } -func (c *ClientCache) Get(address network.AddressGroup) (client.Client, error) { - var info clientcore.NodeInfo - - info.SetAddressGroup(address) - +func (c *ClientCache) Get(info clientcore.NodeInfo) (client.Client, error) { // Because cache is used by `ClientCache` exclusively, // client will always have valid key. return c.cache.Get(info) @@ -77,19 +71,15 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma getParams := new(client.GetObjectParams) getParams.WithAddress(addr) + var info clientcore.NodeInfo + for _, node := range placement.FlattenNodes(nodes) { - var netAddr network.AddressGroup - - err := netAddr.FromIterator(node) + err := clientcore.NodeInfoFromRawNetmapElement(&info, node) if err != nil { - c.log.Warn("can't parse remote address", - zap.String("key", hex.EncodeToString(node.PublicKey())), - zap.String("error", err.Error())) - - continue + return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(netAddr) + cli, err := c.Get(info) if err != nil { c.log.Warn("can't setup remote connection", zap.String("error", err.Error())) @@ -141,16 +131,16 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object. headParams.WithMainFields() headParams.WithAddress(objAddress) - var netAddr network.AddressGroup + var info clientcore.NodeInfo - err := netAddr.FromIterator(node) + err := clientcore.NodeInfoFromRawNetmapElement(&info, node) if err != nil { - return nil, fmt.Errorf("can't parse remote address: %w", err) + return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(netAddr) + cli, err := c.Get(info) if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err) + return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout) @@ -179,16 +169,16 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje rangeParams.WithRangeList(rng) rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game - var netAddr network.AddressGroup + var info clientcore.NodeInfo - err := netAddr.FromIterator(node) + err := clientcore.NodeInfoFromRawNetmapElement(&info, node) if err != nil { - return nil, fmt.Errorf("can't parse remote address: %w", err) + return nil, fmt.Errorf("parse client node info: %w", err) } - cli, err := c.Get(netAddr) + cli, err := c.Get(info) if err != nil { - return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err) + return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err) } cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout) diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 6381bdb7..9fda7996 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -81,7 +81,7 @@ func (exec *execCtx) processCurrentEpoch() bool { // we reach the best result - split info with linking object ID. var info client.NodeInfo - info.SetAddressGroup(addrs[i].Addresses()) + client.NodeInfoFromNetmapElement(&info, addrs[i]) if exec.processNode(ctx, info) { exec.log.Debug("completing the operation") diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index 7f1db333..390b175e 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -6,10 +6,10 @@ import ( "fmt" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) @@ -29,7 +29,7 @@ type RemoteHeader struct { type RemoteHeadPrm struct { commonHeadPrm *Prm - node network.AddressGroup + node *netmap.NodeInfo } var ErrNotFound = errors.New("object header not found") @@ -42,8 +42,8 @@ func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *Remo } } -// WithNodeAddress sets network address group of the remote node. -func (p *RemoteHeadPrm) WithNodeAddress(v network.AddressGroup) *RemoteHeadPrm { +// WithNodeInfo sets information about the remote node. +func (p *RemoteHeadPrm) WithNodeInfo(v *netmap.NodeInfo) *RemoteHeadPrm { if p != nil { p.node = v } @@ -69,11 +69,14 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob var info clientcore.NodeInfo - info.SetAddressGroup(prm.node) + err = clientcore.NodeInfoFromRawNetmapElement(&info, prm.node) + if err != nil { + return nil, fmt.Errorf("parse client node info: %w", err) + } c, err := h.clientCache.Get(info) if err != nil { - return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err) + return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err) } p := new(client.ObjectHeaderParams). @@ -91,7 +94,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob client.WithKey(key), ) if err != nil { - return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, prm.node, err) + return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err) } return object.NewFromSDK(hdr), nil diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 6ba9e0d9..fa4e7bca 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -5,9 +5,9 @@ import ( "fmt" "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" ) @@ -21,7 +21,7 @@ type remoteTarget struct { commonPrm *util.CommonPrm - addr network.AddressGroup + nodeInfo clientcore.NodeInfo obj *object.Object @@ -38,7 +38,7 @@ type RemoteSender struct { // RemotePutPrm groups remote put operation parameters. type RemotePutPrm struct { - node network.AddressGroup + node *netmap.NodeInfo obj *object.Object } @@ -55,13 +55,9 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err) } - var info clientcore.NodeInfo - - info.SetAddressGroup(t.addr) - - c, err := t.clientConstructor.Get(info) + c, err := t.clientConstructor.Get(t.nodeInfo) if err != nil { - return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err) + return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) } id, err := c.PutObject(t.ctx, new(client.PutObjectParams). @@ -75,7 +71,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { )..., ) if err != nil { - return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.addr, err) + return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) } return new(transformer.AccessIdentifiers). @@ -90,8 +86,8 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot } } -// WithNodeAddress sets network address of the remote node. -func (p *RemotePutPrm) WithNodeAddress(v network.AddressGroup) *RemotePutPrm { +// WithNodeInfo sets information about the remote node. +func (p *RemotePutPrm) WithNodeInfo(v *netmap.NodeInfo) *RemotePutPrm { if p != nil { p.node = v } @@ -113,10 +109,14 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { t := &remoteTarget{ ctx: ctx, keyStorage: s.keyStorage, - addr: p.node, clientConstructor: s.clientConstructor, } + err := clientcore.NodeInfoFromRawNetmapElement(&t.nodeInfo, p.node) + if err != nil { + return fmt.Errorf("parse client node info: %w", err) + } + if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil { return fmt.Errorf("(%T) could not send object header: %w", s, err) } else if _, err := t.Close(); err != nil { diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 93fe270b..2734e512 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -148,18 +148,16 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { var relay func(nodeDesc) error if p.relay != nil { relay = func(node nodeDesc) error { - addr := node.info.Addresses() - var info client.NodeInfo - info.SetAddressGroup(addr) + client.NodeInfoFromNetmapElement(&info, node.info) c, err := p.clientConstructor.Get(info) if err != nil { - return fmt.Errorf("could not create SDK client %s: %w", addr, err) + return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err) } - return p.relay(addr, c) + return p.relay(info.AddressGroup(), c) } } @@ -174,13 +172,16 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { } } - return &remoteTarget{ + rt := &remoteTarget{ ctx: p.ctx, keyStorage: p.keyStorage, commonPrm: prm.common, - addr: node.info.Addresses(), clientConstructor: p.clientConstructor, } + + client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info) + + return rt }, relay: relay, fmt: p.fmtValidator, diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index 434ddcd2..4a9230e8 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -79,7 +79,7 @@ func (exec *execCtx) processCurrentEpoch() bool { // TODO: consider parallel execution var info client.NodeInfo - info.SetAddressGroup(addrs[i].Addresses()) + client.NodeInfoFromNetmapElement(&info, addrs[i]) exec.processNode(ctx, info) } diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 2d11e030..730a461a 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/netmap" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/network" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" "go.uber.org/zap" @@ -61,17 +60,6 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes default: } - var node network.AddressGroup - - err := node.FromIterator(nodes[i]) - if err != nil { - log.Error("could not parse network address", - zap.String("error", err.Error()), - ) - - continue - } - if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) { if shortage == 0 { // we can call the redundant copy callback @@ -85,7 +73,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes } else if shortage > 0 { callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) - _, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node)) + _, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo)) cancel() diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 3786698f..fae8fea1 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" - "github.com/nspcc-dev/neofs-node/pkg/network" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" "go.uber.org/zap" ) @@ -69,20 +68,9 @@ func (p *Replicator) handleTask(ctx context.Context, task *Task) { log := p.log.With(zap.String("node", hex.EncodeToString(task.nodes[i].PublicKey()))) - var node network.AddressGroup - - err := node.FromIterator(task.nodes[i]) - if err != nil { - log.Error("could not parse network address", - zap.String("error", err.Error()), - ) - - continue - } - callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) - err = p.remoteSender.PutObject(callCtx, prm.WithNodeAddress(node)) + err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i].NodeInfo)) cancel()