[#645] *: Use helper functions to build client.NodeInfo structures
Helper functions from core/client package allow to set public keys of storage nodes. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
2d441a4cc6
commit
e473f3ac91
12 changed files with 67 additions and 107 deletions
|
@ -23,7 +23,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
|
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
|
containerTransportGRPC "github.com/nspcc-dev/neofs-node/pkg/network/transport/container/grpc"
|
||||||
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
|
containerService "github.com/nspcc-dev/neofs-node/pkg/services/container"
|
||||||
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
||||||
|
@ -261,16 +260,12 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc
|
||||||
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
return loadcontroller.SimpleWriterProvider(new(nopLoadWriter)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var netAddr network.AddressGroup
|
|
||||||
|
|
||||||
err := netAddr.FromIterator(srv)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(netAddr)
|
err := client.NodeInfoFromRawNetmapElement(&info, srv)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
c, err := r.clientCache.Get(info)
|
c, err := r.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||||
reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router"
|
reputationrouter "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common/router"
|
||||||
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
|
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
|
||||||
|
@ -77,16 +76,12 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
|
||||||
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
|
return trustcontroller.SimpleWriterProvider(new(NopReputationWriter)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var netAddr network.AddressGroup
|
|
||||||
|
|
||||||
err := netAddr.FromIterator(srv)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(netAddr)
|
err := client.NodeInfoFromRawNetmapElement(&info, srv)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
c, err := rtp.clientCache.Get(info)
|
c, err := rtp.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/storagegroup"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/rand"
|
"github.com/nspcc-dev/neofs-node/pkg/util/rand"
|
||||||
|
@ -117,6 +117,8 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob
|
||||||
|
|
||||||
ln := len(shuffled)
|
ln := len(shuffled)
|
||||||
|
|
||||||
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
for i := range shuffled { // consider iterating over some part of container
|
for i := range shuffled { // consider iterating over some part of container
|
||||||
log := ap.log.With(
|
log := ap.log.With(
|
||||||
zap.Stringer("cid", cid),
|
zap.Stringer("cid", cid),
|
||||||
|
@ -125,16 +127,14 @@ func (ap *Processor) findStorageGroups(cid *cid.ID, shuffled netmap.Nodes) []*ob
|
||||||
zap.Int("total_tries", ln),
|
zap.Int("total_tries", ln),
|
||||||
)
|
)
|
||||||
|
|
||||||
var netAddr network.AddressGroup
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, shuffled[i])
|
||||||
|
|
||||||
err := netAddr.FromIterator(shuffled[i])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("can't parse remote address", zap.String("error", err.Error()))
|
log.Warn("parse client node info", zap.String("error", err.Error()))
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := ap.clientCache.Get(netAddr)
|
cli, err := ap.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("can't setup remote connection", zap.String("error", err.Error()))
|
log.Warn("can't setup remote connection", zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
SDKClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
wrapContainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||||
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
wrapNetmap "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -26,7 +26,7 @@ type (
|
||||||
|
|
||||||
// NeoFSClientCache is an interface for cache of neofs RPC clients
|
// NeoFSClientCache is an interface for cache of neofs RPC clients
|
||||||
NeoFSClientCache interface {
|
NeoFSClientCache interface {
|
||||||
Get(network.AddressGroup) (SDKClient.Client, error)
|
Get(client.NodeInfo) (SDKClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
TaskManager interface {
|
TaskManager interface {
|
||||||
|
|
|
@ -3,7 +3,6 @@ package innerring
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,7 +12,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
||||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
"github.com/nspcc-dev/neofs-node/pkg/services/audit"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
@ -48,11 +46,7 @@ func newClientCache(p *clientCacheParams) *ClientCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientCache) Get(address network.AddressGroup) (client.Client, error) {
|
func (c *ClientCache) Get(info clientcore.NodeInfo) (client.Client, error) {
|
||||||
var info clientcore.NodeInfo
|
|
||||||
|
|
||||||
info.SetAddressGroup(address)
|
|
||||||
|
|
||||||
// Because cache is used by `ClientCache` exclusively,
|
// Because cache is used by `ClientCache` exclusively,
|
||||||
// client will always have valid key.
|
// client will always have valid key.
|
||||||
return c.cache.Get(info)
|
return c.cache.Get(info)
|
||||||
|
@ -77,19 +71,15 @@ func (c *ClientCache) getSG(ctx context.Context, addr *object.Address, nm *netma
|
||||||
getParams := new(client.GetObjectParams)
|
getParams := new(client.GetObjectParams)
|
||||||
getParams.WithAddress(addr)
|
getParams.WithAddress(addr)
|
||||||
|
|
||||||
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
for _, node := range placement.FlattenNodes(nodes) {
|
for _, node := range placement.FlattenNodes(nodes) {
|
||||||
var netAddr network.AddressGroup
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
||||||
|
|
||||||
err := netAddr.FromIterator(node)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Warn("can't parse remote address",
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
zap.String("key", hex.EncodeToString(node.PublicKey())),
|
|
||||||
zap.String("error", err.Error()))
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := c.Get(netAddr)
|
cli, err := c.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Warn("can't setup remote connection",
|
c.log.Warn("can't setup remote connection",
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
@ -141,16 +131,16 @@ func (c *ClientCache) GetHeader(task *audit.Task, node *netmap.Node, id *object.
|
||||||
headParams.WithMainFields()
|
headParams.WithMainFields()
|
||||||
headParams.WithAddress(objAddress)
|
headParams.WithAddress(objAddress)
|
||||||
|
|
||||||
var netAddr network.AddressGroup
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
err := netAddr.FromIterator(node)
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't parse remote address: %w", err)
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := c.Get(netAddr)
|
cli, err := c.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
|
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.headTimeout)
|
||||||
|
@ -179,16 +169,16 @@ func (c *ClientCache) GetRangeHash(task *audit.Task, node *netmap.Node, id *obje
|
||||||
rangeParams.WithRangeList(rng)
|
rangeParams.WithRangeList(rng)
|
||||||
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game
|
rangeParams.WithSalt(nil) // it MUST be nil for correct hash concatenation in PDP game
|
||||||
|
|
||||||
var netAddr network.AddressGroup
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
err := netAddr.FromIterator(node)
|
err := clientcore.NodeInfoFromRawNetmapElement(&info, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't parse remote address: %w", err)
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cli, err := c.Get(netAddr)
|
cli, err := c.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("can't setup remote connection with %s: %w", netAddr, err)
|
return nil, fmt.Errorf("can't setup remote connection with %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout)
|
cctx, cancel := context.WithTimeout(task.AuditContext(), c.rangeTimeout)
|
||||||
|
|
|
@ -81,7 +81,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
// we reach the best result - split info with linking object ID.
|
// we reach the best result - split info with linking object ID.
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(addrs[i].Addresses())
|
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||||
|
|
||||||
if exec.processNode(ctx, info) {
|
if exec.processNode(ctx, info) {
|
||||||
exec.log.Debug("completing the operation")
|
exec.log.Debug("completing the operation")
|
||||||
|
|
|
@ -6,10 +6,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -29,7 +29,7 @@ type RemoteHeader struct {
|
||||||
type RemoteHeadPrm struct {
|
type RemoteHeadPrm struct {
|
||||||
commonHeadPrm *Prm
|
commonHeadPrm *Prm
|
||||||
|
|
||||||
node network.AddressGroup
|
node *netmap.NodeInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
var ErrNotFound = errors.New("object header not found")
|
var ErrNotFound = errors.New("object header not found")
|
||||||
|
@ -42,8 +42,8 @@ func NewRemoteHeader(keyStorage *util.KeyStorage, cache ClientConstructor) *Remo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeAddress sets network address group of the remote node.
|
// WithNodeInfo sets information about the remote node.
|
||||||
func (p *RemoteHeadPrm) WithNodeAddress(v network.AddressGroup) *RemoteHeadPrm {
|
func (p *RemoteHeadPrm) WithNodeInfo(v *netmap.NodeInfo) *RemoteHeadPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
@ -69,11 +69,14 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
||||||
|
|
||||||
var info clientcore.NodeInfo
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(prm.node)
|
err = clientcore.NodeInfoFromRawNetmapElement(&info, prm.node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
c, err := h.clientCache.Get(info)
|
c, err := h.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err)
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p := new(client.ObjectHeaderParams).
|
p := new(client.ObjectHeaderParams).
|
||||||
|
@ -91,7 +94,7 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
||||||
client.WithKey(key),
|
client.WithKey(key),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, prm.node, err)
|
return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return object.NewFromSDK(hdr), nil
|
return object.NewFromSDK(hdr), nil
|
||||||
|
|
|
@ -5,9 +5,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
|
||||||
)
|
)
|
||||||
|
@ -21,7 +21,7 @@ type remoteTarget struct {
|
||||||
|
|
||||||
commonPrm *util.CommonPrm
|
commonPrm *util.CommonPrm
|
||||||
|
|
||||||
addr network.AddressGroup
|
nodeInfo clientcore.NodeInfo
|
||||||
|
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ type RemoteSender struct {
|
||||||
|
|
||||||
// RemotePutPrm groups remote put operation parameters.
|
// RemotePutPrm groups remote put operation parameters.
|
||||||
type RemotePutPrm struct {
|
type RemotePutPrm struct {
|
||||||
node network.AddressGroup
|
node *netmap.NodeInfo
|
||||||
|
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
}
|
}
|
||||||
|
@ -55,13 +55,9 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var info clientcore.NodeInfo
|
c, err := t.clientConstructor.Get(t.nodeInfo)
|
||||||
|
|
||||||
info.SetAddressGroup(t.addr)
|
|
||||||
|
|
||||||
c, err := t.clientConstructor.Get(info)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err)
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
|
id, err := c.PutObject(t.ctx, new(client.PutObjectParams).
|
||||||
|
@ -75,7 +71,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
)...,
|
)...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.addr, err)
|
return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return new(transformer.AccessIdentifiers).
|
return new(transformer.AccessIdentifiers).
|
||||||
|
@ -90,8 +86,8 @@ func NewRemoteSender(keyStorage *util.KeyStorage, cons ClientConstructor) *Remot
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeAddress sets network address of the remote node.
|
// WithNodeInfo sets information about the remote node.
|
||||||
func (p *RemotePutPrm) WithNodeAddress(v network.AddressGroup) *RemotePutPrm {
|
func (p *RemotePutPrm) WithNodeInfo(v *netmap.NodeInfo) *RemotePutPrm {
|
||||||
if p != nil {
|
if p != nil {
|
||||||
p.node = v
|
p.node = v
|
||||||
}
|
}
|
||||||
|
@ -113,10 +109,14 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
|
||||||
t := &remoteTarget{
|
t := &remoteTarget{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
keyStorage: s.keyStorage,
|
keyStorage: s.keyStorage,
|
||||||
addr: p.node,
|
|
||||||
clientConstructor: s.clientConstructor,
|
clientConstructor: s.clientConstructor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := clientcore.NodeInfoFromRawNetmapElement(&t.nodeInfo, p.node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse client node info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {
|
if err := t.WriteHeader(object.NewRawFromObject(p.obj)); err != nil {
|
||||||
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
return fmt.Errorf("(%T) could not send object header: %w", s, err)
|
||||||
} else if _, err := t.Close(); err != nil {
|
} else if _, err := t.Close(); err != nil {
|
||||||
|
|
|
@ -148,18 +148,16 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
var relay func(nodeDesc) error
|
var relay func(nodeDesc) error
|
||||||
if p.relay != nil {
|
if p.relay != nil {
|
||||||
relay = func(node nodeDesc) error {
|
relay = func(node nodeDesc) error {
|
||||||
addr := node.info.Addresses()
|
|
||||||
|
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(addr)
|
client.NodeInfoFromNetmapElement(&info, node.info)
|
||||||
|
|
||||||
c, err := p.clientConstructor.Get(info)
|
c, err := p.clientConstructor.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create SDK client %s: %w", addr, err)
|
return fmt.Errorf("could not create SDK client %s: %w", info.AddressGroup(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.relay(addr, c)
|
return p.relay(info.AddressGroup(), c)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,13 +172,16 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &remoteTarget{
|
rt := &remoteTarget{
|
||||||
ctx: p.ctx,
|
ctx: p.ctx,
|
||||||
keyStorage: p.keyStorage,
|
keyStorage: p.keyStorage,
|
||||||
commonPrm: prm.common,
|
commonPrm: prm.common,
|
||||||
addr: node.info.Addresses(),
|
|
||||||
clientConstructor: p.clientConstructor,
|
clientConstructor: p.clientConstructor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client.NodeInfoFromNetmapElement(&rt.nodeInfo, node.info)
|
||||||
|
|
||||||
|
return rt
|
||||||
},
|
},
|
||||||
relay: relay,
|
relay: relay,
|
||||||
fmt: p.fmtValidator,
|
fmt: p.fmtValidator,
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
// TODO: consider parallel execution
|
// TODO: consider parallel execution
|
||||||
var info client.NodeInfo
|
var info client.NodeInfo
|
||||||
|
|
||||||
info.SetAddressGroup(addrs[i].Addresses())
|
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||||
|
|
||||||
exec.processNode(ctx, info)
|
exec.processNode(ctx, info)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -61,17 +60,6 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
var node network.AddressGroup
|
|
||||||
|
|
||||||
err := node.FromIterator(nodes[i])
|
|
||||||
if err != nil {
|
|
||||||
log.Error("could not parse network address",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
if p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) {
|
||||||
if shortage == 0 {
|
if shortage == 0 {
|
||||||
// we can call the redundant copy callback
|
// we can call the redundant copy callback
|
||||||
|
@ -85,7 +73,7 @@ func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes
|
||||||
} else if shortage > 0 {
|
} else if shortage > 0 {
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.headTimeout)
|
||||||
|
|
||||||
_, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node))
|
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i].NodeInfo))
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -69,20 +68,9 @@ func (p *Replicator) handleTask(ctx context.Context, task *Task) {
|
||||||
|
|
||||||
log := p.log.With(zap.String("node", hex.EncodeToString(task.nodes[i].PublicKey())))
|
log := p.log.With(zap.String("node", hex.EncodeToString(task.nodes[i].PublicKey())))
|
||||||
|
|
||||||
var node network.AddressGroup
|
|
||||||
|
|
||||||
err := node.FromIterator(task.nodes[i])
|
|
||||||
if err != nil {
|
|
||||||
log.Error("could not parse network address",
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
|
callCtx, cancel := context.WithTimeout(ctx, p.putTimeout)
|
||||||
|
|
||||||
err = p.remoteSender.PutObject(callCtx, prm.WithNodeAddress(node))
|
err = p.remoteSender.PutObject(callCtx, prm.WithNodeInfo(task.nodes[i].NodeInfo))
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue