[#645] *: Construct clients from client.NodeInfo in API client cache
There is a need to have the ability to expand the data needed for client construction. Replace `network.AddressGroup` parameter of client cache interfaces with `client.NodeInfo`. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
91cc33bdb9
commit
7b228b7603
25 changed files with 114 additions and 72 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
|
||||||
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
|
containerV2 "github.com/nspcc-dev/neofs-api-go/v2/container"
|
||||||
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
|
containerGRPC "github.com/nspcc-dev/neofs-api-go/v2/container/grpc"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
|
containerCore "github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||||
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
|
@ -244,7 +245,7 @@ type remoteLoadAnnounceProvider struct {
|
||||||
netmapKeys netmapCore.AnnouncedKeys
|
netmapKeys netmapCore.AnnouncedKeys
|
||||||
|
|
||||||
clientCache interface {
|
clientCache interface {
|
||||||
Get(network.AddressGroup) (apiClient.Client, error)
|
Get(client.NodeInfo) (apiClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
deadEndProvider loadcontroller.WriterProvider
|
deadEndProvider loadcontroller.WriterProvider
|
||||||
|
@ -267,7 +268,11 @@ func (r *remoteLoadAnnounceProvider) InitRemote(srv loadroute.ServerInfo) (loadc
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := r.clientCache.Get(netAddr)
|
var info client.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(netAddr)
|
||||||
|
|
||||||
|
c, err := r.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -166,8 +166,8 @@ func (f *innerRingFetcherWithoutNotary) InnerRingKeys() ([][]byte, error) {
|
||||||
|
|
||||||
type coreClientConstructor reputationClientConstructor
|
type coreClientConstructor reputationClientConstructor
|
||||||
|
|
||||||
func (x *coreClientConstructor) Get(addrGroup network.AddressGroup) (coreclient.Client, error) {
|
func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.Client, error) {
|
||||||
c, err := (*reputationClientConstructor)(x).Get(addrGroup)
|
c, err := (*reputationClientConstructor)(x).Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -442,7 +442,7 @@ type reputationClientConstructor struct {
|
||||||
trustStorage *truststorage.Storage
|
trustStorage *truststorage.Storage
|
||||||
|
|
||||||
basicConstructor interface {
|
basicConstructor interface {
|
||||||
Get(network.AddressGroup) (client.Client, error)
|
Get(coreclient.NodeInfo) (client.Client, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -526,14 +526,16 @@ func (c *reputationClient) SearchObject(ctx context.Context, prm *client.SearchO
|
||||||
return ids, err
|
return ids, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *reputationClientConstructor) Get(addr network.AddressGroup) (client.Client, error) {
|
func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (client.Client, error) {
|
||||||
cl, err := c.basicConstructor.Get(addr)
|
cl, err := c.basicConstructor.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
nm, err := netmap.GetLatestNetworkMap(c.nmSrc)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
addr := info.AddressGroup()
|
||||||
|
|
||||||
for i := range nm.Nodes {
|
for i := range nm.Nodes {
|
||||||
var netAddr network.AddressGroup
|
var netAddr network.AddressGroup
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
apiClient "github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
|
||||||
|
@ -12,7 +13,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type clientCache interface {
|
type clientCache interface {
|
||||||
Get(network.AddressGroup) (apiClient.Client, error)
|
Get(client.NodeInfo) (apiClient.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// clientKeyRemoteProvider must provide remote writer and take into account
|
// clientKeyRemoteProvider must provide remote writer and take into account
|
||||||
|
@ -83,7 +84,11 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
|
||||||
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
return nil, fmt.Errorf("could not convert address to IP format: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := rtp.clientCache.Get(netAddr)
|
var info client.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(netAddr)
|
||||||
|
|
||||||
|
c, err := rtp.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
return nil, fmt.Errorf("could not initialize API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
"github.com/nspcc-dev/neofs-api-go/pkg/storagegroup"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
coreObject "github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
|
||||||
|
@ -22,10 +23,7 @@ import (
|
||||||
type (
|
type (
|
||||||
ClientCache struct {
|
ClientCache struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
cache interface {
|
cache *cache.ClientCache
|
||||||
Get(address network.AddressGroup) (client.Client, error)
|
|
||||||
CloseAll()
|
|
||||||
}
|
|
||||||
key *ecdsa.PrivateKey
|
key *ecdsa.PrivateKey
|
||||||
|
|
||||||
sgTimeout, headTimeout, rangeTimeout time.Duration
|
sgTimeout, headTimeout, rangeTimeout time.Duration
|
||||||
|
@ -51,9 +49,13 @@ func newClientCache(p *clientCacheParams) *ClientCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ClientCache) Get(address network.AddressGroup) (client.Client, error) {
|
func (c *ClientCache) Get(address network.AddressGroup) (client.Client, error) {
|
||||||
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(address)
|
||||||
|
|
||||||
// Because cache is used by `ClientCache` exclusively,
|
// Because cache is used by `ClientCache` exclusively,
|
||||||
// client will always have valid key.
|
// client will always have valid key.
|
||||||
return c.cache.Get(address)
|
return c.cache.Get(info)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetSG polls the container from audit task to get the object by id.
|
// GetSG polls the container from audit task to get the object by id.
|
||||||
|
|
5
pkg/network/cache/client.go
vendored
5
pkg/network/cache/client.go
vendored
|
@ -4,6 +4,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,7 +29,9 @@ func NewSDKClientCache(opts ...client.Option) *ClientCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get function returns existing client or creates a new one.
|
// Get function returns existing client or creates a new one.
|
||||||
func (c *ClientCache) Get(netAddr network.AddressGroup) (client.Client, error) {
|
func (c *ClientCache) Get(info clientcore.NodeInfo) (client.Client, error) {
|
||||||
|
netAddr := info.AddressGroup()
|
||||||
|
|
||||||
// multiaddr is used as a key in client cache since
|
// multiaddr is used as a key in client cache since
|
||||||
// same host may have different connections(with tls or not),
|
// same host may have different connections(with tls or not),
|
||||||
// therefore, host+port pair is not unique
|
// therefore, host+port pair is not unique
|
||||||
|
|
|
@ -3,6 +3,7 @@ package getsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -78,7 +79,11 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
// TODO: consider parallel execution
|
// TODO: consider parallel execution
|
||||||
// TODO: consider optimization: if status == SPLIT we can continue until
|
// TODO: consider optimization: if status == SPLIT we can continue until
|
||||||
// we reach the best result - split info with linking object ID.
|
// we reach the best result - split info with linking object ID.
|
||||||
if exec.processNode(ctx, addrs[i].Addresses()) {
|
var info client.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(addrs[i].Addresses())
|
||||||
|
|
||||||
|
if exec.processNode(ctx, info) {
|
||||||
exec.log.Debug("completing the operation")
|
exec.log.Debug("completing the operation")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,8 +8,8 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
@ -264,8 +264,8 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(node network.AddressGroup) (getClient, bool) {
|
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
||||||
c, err := exec.svc.clientCache.get(node)
|
c, err := exec.svc.clientCache.get(info)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
@ -82,8 +83,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(mAddr network.AddressGroup) (getClient, error) {
|
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
|
||||||
v, ok := c.clients[network.StringifyGroup(mAddr)]
|
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
}
|
}
|
||||||
|
@ -100,7 +101,7 @@ func newTestClient() *testClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) getObject(exec *execCtx, _ network.AddressGroup) (*objectSDK.Object, error) {
|
func (c *testClient) getObject(exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
v, ok := c.results[exec.address().String()]
|
v, ok := c.results[exec.address().String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, object.ErrNotFound
|
return nil, object.ErrNotFound
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -34,7 +33,7 @@ type RangeHashPrm struct {
|
||||||
salt []byte
|
salt []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestForwarder func(network.AddressGroup, coreclient.Client) (*objectSDK.Object, error)
|
type RequestForwarder func(coreclient.NodeInfo, coreclient.Client) (*objectSDK.Object, error)
|
||||||
|
|
||||||
// HeadPrm groups parameters of Head service call.
|
// HeadPrm groups parameters of Head service call.
|
||||||
type HeadPrm struct {
|
type HeadPrm struct {
|
||||||
|
|
|
@ -5,20 +5,20 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) bool {
|
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
exec.log.Debug("processing node...")
|
exec.log.Debug("processing node...")
|
||||||
|
|
||||||
client, ok := exec.remoteClient(addr)
|
client, ok := exec.remoteClient(info)
|
||||||
if !ok {
|
if !ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := client.getObject(exec, addr)
|
obj, err := client.getObject(exec, info)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
@ -22,7 +21,7 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type getClient interface {
|
type getClient interface {
|
||||||
getObject(*execCtx, network.AddressGroup) (*objectSDK.Object, error)
|
getObject(*execCtx, client.NodeInfo) (*objectSDK.Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -35,7 +34,7 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientCache interface {
|
clientCache interface {
|
||||||
get(network.AddressGroup) (getClient, error)
|
get(client.NodeInfo) (getClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
|
@ -93,7 +92,7 @@ func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(network.AddressGroup) (client.Client, error)
|
Get(client.NodeInfo) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type SimpleObjectWriter struct {
|
type SimpleObjectWriter struct {
|
||||||
|
@ -73,17 +72,20 @@ func (s *SimpleObjectWriter) Object() *object.Object {
|
||||||
return s.obj.Object()
|
return s.obj.Object()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientCacheWrapper) get(addr network.AddressGroup) (getClient, error) {
|
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
||||||
clt, err := c.cache.Get(addr)
|
clt, err := c.cache.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &clientWrapper{
|
return &clientWrapper{
|
||||||
client: clt,
|
client: clt,
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) getObject(exec *execCtx, addr network.AddressGroup) (*objectSDK.Object, error) {
|
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) {
|
||||||
if exec.isForwardingEnabled() {
|
if exec.isForwardingEnabled() {
|
||||||
return exec.prm.forwarder(addr, c.client)
|
return exec.prm.forwarder(info, c.client)
|
||||||
}
|
}
|
||||||
|
|
||||||
if exec.headOnly() {
|
if exec.headOnly() {
|
||||||
|
|
|
@ -509,13 +509,13 @@ func toShortObjectHeader(hdr *object.Object) objectV2.GetHeaderPart {
|
||||||
}
|
}
|
||||||
|
|
||||||
func groupAddressRequestForwarder(f func(network.Address, client.Client) (*objectSDK.Object, error)) getsvc.RequestForwarder {
|
func groupAddressRequestForwarder(f func(network.Address, client.Client) (*objectSDK.Object, error)) getsvc.RequestForwarder {
|
||||||
return func(addrGroup network.AddressGroup, c client.Client) (*objectSDK.Object, error) {
|
return func(info client.NodeInfo, c client.Client) (*objectSDK.Object, error) {
|
||||||
var (
|
var (
|
||||||
firstErr error
|
firstErr error
|
||||||
res *objectSDK.Object
|
res *objectSDK.Object
|
||||||
)
|
)
|
||||||
|
|
||||||
addrGroup.IterateAddresses(func(addr network.Address) (stop bool) {
|
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -7,13 +7,14 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(network.AddressGroup) (client.Client, error)
|
Get(clientcore.NodeInfo) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteHeader represents utility for getting
|
// RemoteHeader represents utility for getting
|
||||||
|
@ -66,7 +67,11 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob
|
||||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := h.clientCache.Get(prm.node)
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(prm.node)
|
||||||
|
|
||||||
|
c, err := h.clientCache.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err)
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, prm.node, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
|
@ -54,7 +55,11 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
|
||||||
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
return nil, fmt.Errorf("(%T) could not receive private key: %w", t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c, err := t.clientConstructor.Get(t.addr)
|
var info clientcore.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(t.addr)
|
||||||
|
|
||||||
|
c, err := t.clientConstructor.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err)
|
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.addr, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util"
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
@ -30,7 +29,7 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(network.AddressGroup) (client.Client, error)
|
Get(client.NodeInfo) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
|
|
@ -150,7 +150,11 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
|
||||||
relay = func(node nodeDesc) error {
|
relay = func(node nodeDesc) error {
|
||||||
addr := node.info.Addresses()
|
addr := node.info.Addresses()
|
||||||
|
|
||||||
c, err := p.clientConstructor.Get(addr)
|
var info client.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(addr)
|
||||||
|
|
||||||
|
c, err := p.clientConstructor.Get(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not create SDK client %s: %w", addr, err)
|
return fmt.Errorf("could not create SDK client %s: %w", addr, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package searchsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -76,7 +77,11 @@ func (exec *execCtx) processCurrentEpoch() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: consider parallel execution
|
// TODO: consider parallel execution
|
||||||
exec.processNode(ctx, addrs[i].Addresses())
|
var info client.NodeInfo
|
||||||
|
|
||||||
|
info.SetAddressGroup(addrs[i].Addresses())
|
||||||
|
|
||||||
|
exec.processNode(ctx, info)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
|
|
||||||
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -117,8 +117,8 @@ func (exec *execCtx) generateTraverser(cid *cid.ID) (*placement.Traverser, bool)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(node network.AddressGroup) (searchClient, bool) {
|
func (exec execCtx) remoteClient(info client.NodeInfo) (searchClient, bool) {
|
||||||
c, err := exec.svc.clientConstructor.get(node)
|
c, err := exec.svc.clientConstructor.get(info)
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
exec.status = statusUndefined
|
exec.status = statusUndefined
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -27,7 +26,7 @@ type IDListWriter interface {
|
||||||
|
|
||||||
// RequestForwarder is a callback for forwarding of the
|
// RequestForwarder is a callback for forwarding of the
|
||||||
// original Search requests.
|
// original Search requests.
|
||||||
type RequestForwarder func(network.AddressGroup, coreclient.Client) ([]*objectSDK.ID, error)
|
type RequestForwarder func(coreclient.NodeInfo, coreclient.Client) ([]*objectSDK.ID, error)
|
||||||
|
|
||||||
// SetCommonParameters sets common parameters of the operation.
|
// SetCommonParameters sets common parameters of the operation.
|
||||||
func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
func (p *Prm) SetCommonParameters(common *util.CommonPrm) {
|
||||||
|
|
|
@ -3,19 +3,19 @@ package searchsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, addr network.AddressGroup) {
|
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) {
|
||||||
exec.log.Debug("processing node...")
|
exec.log.Debug("processing node...")
|
||||||
|
|
||||||
client, ok := exec.remoteClient(addr)
|
client, ok := exec.remoteClient(info)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ids, err := client.searchObjects(exec, addr)
|
ids, err := client.searchObjects(exec, info)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exec.log.Debug("local operation failed",
|
exec.log.Debug("local operation failed",
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test"
|
||||||
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
|
clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
|
@ -84,8 +85,8 @@ func (p *testPlacementBuilder) BuildPlacement(addr *objectSDK.Address, _ *netmap
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(mAddr network.AddressGroup) (searchClient, error) {
|
func (c *testClientCache) get(info clientcore.NodeInfo) (searchClient, error) {
|
||||||
v, ok := c.clients[network.StringifyGroup(mAddr)]
|
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
}
|
}
|
||||||
|
@ -102,7 +103,7 @@ func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) {
|
||||||
return v.ids, v.err
|
return v.ids, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testStorage) searchObjects(exec *execCtx, _ network.AddressGroup) ([]*objectSDK.ID, error) {
|
func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]*objectSDK.ID, error) {
|
||||||
v, ok := c.items[exec.containerID().String()]
|
v, ok := c.items[exec.containerID().String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||||
|
@ -23,11 +22,11 @@ type Service struct {
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type searchClient interface {
|
type searchClient interface {
|
||||||
searchObjects(*execCtx, network.AddressGroup) ([]*object.ID, error)
|
searchObjects(*execCtx, client.NodeInfo) ([]*object.ID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConstructor interface {
|
type ClientConstructor interface {
|
||||||
Get(network.AddressGroup) (client.Client, error)
|
Get(client.NodeInfo) (client.Client, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
|
@ -38,7 +37,7 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
clientConstructor interface {
|
clientConstructor interface {
|
||||||
get(network.AddressGroup) (searchClient, error)
|
get(client.NodeInfo) (searchClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
"github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
|
||||||
)
|
)
|
||||||
|
@ -68,17 +67,20 @@ func (w *uniqueIDWriter) WriteIDs(list []*objectSDK.ID) error {
|
||||||
return w.writer.WriteIDs(list)
|
return w.writer.WriteIDs(list)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientConstructorWrapper) get(addr network.AddressGroup) (searchClient, error) {
|
func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, error) {
|
||||||
clt, err := c.constructor.Get(addr)
|
clt, err := c.constructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &clientWrapper{
|
return &clientWrapper{
|
||||||
client: clt,
|
client: clt,
|
||||||
}, err
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) searchObjects(exec *execCtx, addr network.AddressGroup) ([]*objectSDK.ID, error) {
|
func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]*objectSDK.ID, error) {
|
||||||
if exec.prm.forwarder != nil {
|
if exec.prm.forwarder != nil {
|
||||||
return exec.prm.forwarder(addr, c.client)
|
return exec.prm.forwarder(info, c.client)
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.client.SearchObject(exec.context(),
|
return c.client.SearchObject(exec.context(),
|
||||||
|
|
|
@ -112,13 +112,13 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre
|
||||||
}
|
}
|
||||||
|
|
||||||
func groupAddressRequestForwarder(f func(network.Address, client.Client) ([]*objectSDK.ID, error)) searchsvc.RequestForwarder {
|
func groupAddressRequestForwarder(f func(network.Address, client.Client) ([]*objectSDK.ID, error)) searchsvc.RequestForwarder {
|
||||||
return func(addrGroup network.AddressGroup, c client.Client) ([]*objectSDK.ID, error) {
|
return func(info client.NodeInfo, c client.Client) ([]*objectSDK.ID, error) {
|
||||||
var (
|
var (
|
||||||
firstErr error
|
firstErr error
|
||||||
res []*objectSDK.ID
|
res []*objectSDK.ID
|
||||||
)
|
)
|
||||||
|
|
||||||
addrGroup.IterateAddresses(func(addr network.Address) (stop bool) {
|
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
Loading…
Reference in a new issue