diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index bd5805d9..b97e3b15 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -11,12 +11,14 @@ import ( cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" + client3 "github.com/nspcc-dev/neofs-api-go/rpc/client" "github.com/nspcc-dev/neofs-api-go/util/signature" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" apiclientconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/apiclient" policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" + client2 "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -146,6 +148,27 @@ func (n *innerRingFetcher) InnerRingKeys() ([][]byte, error) { return result, nil } +type coreClientConstructor reputationClientConstructor + +func (x *coreClientConstructor) Get(addr network.Address) (client2.Client, error) { + c, err := (*reputationClientConstructor)(x).Get(addr) + if err != nil { + return nil, err + } + + return apiclient{ + Client: c, + }, nil +} + +type apiclient struct { + client.Client +} + +func (x apiclient) RawForAddress(network.Address) *client3.Client { + return x.Client.Raw() +} + func initObjectService(c *cfg) { ls := c.cfgObject.cfgLocalStorage.localStorage keyStorage := util.NewKeyStorage(&c.key.PrivateKey, c.privateTokenStore) @@ -169,6 +192,8 @@ func initObjectService(c *cfg) { basicConstructor: clientCache, } + coreConstructor := (*coreClientConstructor)(clientConstructor) + irFetcher := &innerRingFetcher{ sidechain: c.cfgMorph.client, } @@ -185,7 +210,7 @@ func initObjectService(c *cfg) { ), replicator.WithLocalStorage(ls), replicator.WithRemoteSender( - putsvc.NewRemoteSender(keyStorage, clientConstructor), + putsvc.NewRemoteSender(keyStorage, coreConstructor), ), ) @@ -237,7 +262,7 @@ func initObjectService(c *cfg) { sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), - putsvc.WithClientConstructor(clientConstructor), + putsvc.WithClientConstructor(coreConstructor), putsvc.WithMaxSizeSource(c), putsvc.WithLocalStorage(ls), putsvc.WithContainerSource(c.cfgObject.cnrStorage), @@ -259,7 +284,7 @@ func initObjectService(c *cfg) { sSearch := searchsvc.New( searchsvc.WithLogger(c.log), searchsvc.WithLocalStorageEngine(ls), - searchsvc.WithClientConstructor(clientConstructor), + searchsvc.WithClientConstructor(coreConstructor), searchsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.WithoutSuccessTracking(), @@ -276,7 +301,7 @@ func initObjectService(c *cfg) { sGet := getsvc.New( getsvc.WithLogger(c.log), getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(clientConstructor), + getsvc.WithClientConstructor(coreConstructor), getsvc.WithTraverserGenerator( traverseGen.WithTraverseOptions( placement.SuccessAfter(1), diff --git a/pkg/core/client/client.go b/pkg/core/client/client.go new file mode 100644 index 00000000..80b67653 --- /dev/null +++ b/pkg/core/client/client.go @@ -0,0 +1,17 @@ +package client + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/client" + rawclient "github.com/nspcc-dev/neofs-api-go/rpc/client" + "github.com/nspcc-dev/neofs-node/pkg/network" +) + +// Client is an interface of NeoFS storage +// node's client. +type Client interface { + client.Client + + // RawForAddress must return rawclient.Client + // for the passed network.Address. + RawForAddress(network.Address) *rawclient.Client +} diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 549afd61..e7ad1577 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -100,7 +100,7 @@ func newTestClient() *testClient { } } -func (c *testClient) getObject(exec *execCtx) (*objectSDK.Object, error) { +func (c *testClient) getObject(exec *execCtx, _ network.Address) (*objectSDK.Object, error) { v, ok := c.results[exec.address().String()] if !ok { return nil, object.ErrNotFound diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index a10fb300..e5c81db1 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -5,7 +5,9 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) @@ -32,7 +34,7 @@ type RangeHashPrm struct { salt []byte } -type RequestForwarder func(client.Client) (*objectSDK.Object, error) +type RequestForwarder func(network.Address, coreclient.Client) (*objectSDK.Object, error) // HeadPrm groups parameters of Head service call. type HeadPrm struct { diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index acea51f6..fe700d78 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -20,7 +20,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr network.Address) bool return true } - obj, err := client.getObject(exec) + obj, err := client.getObject(exec, addr) var errSplitInfo *objectSDK.SplitInfoError diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index ab8d51e8..76e07333 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -1,8 +1,8 @@ package getsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -22,7 +22,7 @@ type Service struct { type Option func(*cfg) type getClient interface { - getObject(*execCtx) (*objectSDK.Object, error) + getObject(*execCtx, network.Address) (*objectSDK.Object, error) } type cfg struct { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 72578458..098f94ca 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -5,6 +5,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -22,7 +23,7 @@ type clientCacheWrapper struct { } type clientWrapper struct { - client client.Client + client coreclient.Client } type storageEngineWrapper struct { @@ -80,9 +81,9 @@ func (c *clientCacheWrapper) get(addr network.Address) (getClient, error) { }, err } -func (c *clientWrapper) getObject(exec *execCtx) (*objectSDK.Object, error) { +func (c *clientWrapper) getObject(exec *execCtx, addr network.Address) (*objectSDK.Object, error) { if !exec.assembling && exec.prm.forwarder != nil { - return exec.prm.forwarder(c.client) + return exec.prm.forwarder(addr, c.client) } if exec.headOnly() { diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 5085b5eb..080a8eea 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -9,7 +9,6 @@ import ( "io" "sync" - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" sessionsdk "github.com/nspcc-dev/neofs-api-go/pkg/session" rpcclient "github.com/nspcc-dev/neofs-api-go/rpc/client" @@ -19,7 +18,9 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -54,7 +55,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -78,7 +79,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - stream, err := rpc.GetObject(c.Raw(), req, rpcclient.WithContext(stream.Context())) + stream, err := rpc.GetObject(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) if err != nil { return nil, fmt.Errorf("stream opening failed: %w", err) } @@ -176,7 +177,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -200,7 +201,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get // perhaps it is worth highlighting the utility function in neofs-api-go // open stream - stream, err := rpc.GetObjectRange(c.Raw(), req, rpcclient.WithContext(stream.Context())) + stream, err := rpc.GetObjectRange(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) if err != nil { return nil, fmt.Errorf("could not create Get payload range stream: %w", err) } @@ -339,7 +340,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(c client.Client) (*objectSDK.Object, error) { + p.SetRequestForwarder(func(addr network.Address, c client.Client) (*objectSDK.Object, error) { var err error // once compose and resign forwarding request @@ -363,7 +364,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp // perhaps it is worth highlighting the utility function in neofs-api-go // send Head request - resp, err := rpc.HeadObject(c.Raw(), req, rpcclient.WithContext(ctx)) + resp, err := rpc.HeadObject(c.RawForAddress(addr), req, rpcclient.WithContext(ctx)) if err != nil { return nil, fmt.Errorf("sending the request failed: %w", err) } diff --git a/pkg/services/object/put/prm.go b/pkg/services/object/put/prm.go index f7a766c5..00a4458c 100644 --- a/pkg/services/object/put/prm.go +++ b/pkg/services/object/put/prm.go @@ -1,8 +1,9 @@ package putsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" ) @@ -14,7 +15,7 @@ type PutInitPrm struct { traverseOpts []placement.Option - relay func(client.Client) error + relay func(network.Address, client.Client) error } type PutChunkPrm struct { @@ -45,7 +46,7 @@ func (p *PutInitPrm) WithObject(v *object.RawObject) *PutInitPrm { return p } -func (p *PutInitPrm) WithRelay(f func(client.Client) error) *PutInitPrm { +func (p *PutInitPrm) WithRelay(f func(network.Address, client.Client) error) *PutInitPrm { if p != nil { p.relay = f } diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index d010226d..26611686 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -3,7 +3,7 @@ package putsvc import ( "context" - "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 459fdfca..3d607da3 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" - "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -20,7 +20,7 @@ type Streamer struct { target transformer.ObjectTarget - relay func(client.Client) error + relay func(network.Address, client.Client) error maxPayloadSz uint64 // network config } @@ -159,7 +159,7 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { return fmt.Errorf("could not create SDK client %s: %w", addr, err) } - return p.relay(c) + return p.relay(addr, c) } } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 9f07975f..e1dd3934 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -4,12 +4,13 @@ import ( "errors" "fmt" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/rpc" sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/network" putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) @@ -124,11 +125,11 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { return fromPutResponse(resp), nil } -func (s *streamer) relayRequest(c client.Client) error { +func (s *streamer) relayRequest(addr network.Address, c client.Client) error { // open stream resp := new(object.PutResponse) - stream, err := rpc.PutObject(c.Raw(), resp) + stream, err := rpc.PutObject(c.RawForAddress(addr), resp) if err != nil { return fmt.Errorf("stream opening failed: %w", err) } diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index 11144cc1..3340cb40 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -3,6 +3,8 @@ package searchsvc import ( "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/network" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) @@ -25,7 +27,7 @@ type IDListWriter interface { // RequestForwarder is a callback for forwarding of the // original Search requests. -type RequestForwarder func(client.Client) ([]*objectSDK.ID, error) +type RequestForwarder func(network.Address, coreclient.Client) ([]*objectSDK.ID, error) // SetCommonParameters sets common parameters of the operation. func (p *Prm) SetCommonParameters(common *util.CommonPrm) { diff --git a/pkg/services/object/search/remote.go b/pkg/services/object/search/remote.go index cdf8ed70..d4646b3d 100644 --- a/pkg/services/object/search/remote.go +++ b/pkg/services/object/search/remote.go @@ -17,7 +17,7 @@ func (exec *execCtx) processNode(ctx context.Context, addr network.Address) { return } - ids, err := client.searchObjects(exec) + ids, err := client.searchObjects(exec, addr) if err != nil { exec.log.Debug("local operation failed", diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 827d9899..8e8c200c 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -102,7 +102,7 @@ func (s *testStorage) search(exec *execCtx) ([]*objectSDK.ID, error) { return v.ids, v.err } -func (c *testStorage) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) { +func (c *testStorage) searchObjects(exec *execCtx, _ network.Address) ([]*objectSDK.ID, error) { v, ok := c.items[exec.containerID().String()] if !ok { return nil, nil diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 5b03c105..b68fee63 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,9 +1,9 @@ package searchsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -23,7 +23,7 @@ type Service struct { type Option func(*cfg) type searchClient interface { - searchObjects(*execCtx) ([]*object.ID, error) + searchObjects(*execCtx, network.Address) ([]*object.ID, error) } type ClientConstructor interface { diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index c1f5c4e9..81cecbac 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -3,9 +3,9 @@ package searchsvc import ( "sync" - "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/network" @@ -76,9 +76,9 @@ func (c *clientConstructorWrapper) get(addr network.Address) (searchClient, erro }, err } -func (c *clientWrapper) searchObjects(exec *execCtx) ([]*objectSDK.ID, error) { +func (c *clientWrapper) searchObjects(exec *execCtx, addr network.Address) ([]*objectSDK.ID, error) { if exec.prm.forwarder != nil { - return exec.prm.forwarder(c.client) + return exec.prm.forwarder(addr, c.client) } return c.client.SearchObject(exec.context(), diff --git a/pkg/services/object/search/v2/util.go b/pkg/services/object/search/v2/util.go index bec092b4..b2d4c5d3 100644 --- a/pkg/services/object/search/v2/util.go +++ b/pkg/services/object/search/v2/util.go @@ -6,7 +6,6 @@ import ( "io" "sync" - "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" sessionsdk "github.com/nspcc-dev/neofs-api-go/pkg/session" @@ -15,6 +14,8 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/rpc" "github.com/nspcc-dev/neofs-api-go/v2/session" "github.com/nspcc-dev/neofs-api-go/v2/signature" + "github.com/nspcc-dev/neofs-node/pkg/core/client" + "github.com/nspcc-dev/neofs-node/pkg/network" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -45,7 +46,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre if !commonPrm.LocalOnly() { var onceResign sync.Once - p.SetRequestForwarder(func(c client.Client) ([]*objectSDK.ID, error) { + p.SetRequestForwarder(func(addr network.Address, c client.Client) ([]*objectSDK.ID, error) { var err error // once compose and resign forwarding request @@ -65,7 +66,7 @@ func (s *Service) toPrm(req *objectV2.SearchRequest, stream objectSvc.SearchStre return nil, err } - stream, err := rpc.SearchObjects(c.Raw(), req, rpcclient.WithContext(stream.Context())) + stream, err := rpc.SearchObjects(c.RawForAddress(addr), req, rpcclient.WithContext(stream.Context())) if err != nil { return nil, err }