From b3708fc530df81bea17412b4daa7642cf2a6b1ec Mon Sep 17 00:00:00 2001 From: Leonard Lyubich <leonard@nspcc.ru> Date: Mon, 1 Nov 2021 11:35:33 +0300 Subject: [PATCH] [#957] services/object: Refactor usage of NeoFS API client The client needs of the Object service are limited and change not often. Interface changes of the client library should not affect the operation of various service packages, if they do not change their requirements for the provided functionality. To localize the use of the base client and facilitate further support, an auxiliary package is implemented that will only be used by the Object service. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru> --- pkg/services/object/delete/exec.go | 6 +- pkg/services/object/delete/prm.go | 21 +- pkg/services/object/get/exec.go | 27 +- pkg/services/object/get/prm.go | 15 +- pkg/services/object/get/util.go | 94 ++++-- pkg/services/object/get/v2/util.go | 9 +- pkg/services/object/head/prm.go | 31 -- pkg/services/object/head/remote.go | 24 +- pkg/services/object/internal/client/client.go | 301 ++++++++++++++++++ pkg/services/object/internal/client/doc.go | 11 + pkg/services/object/put/remote.go | 25 +- pkg/services/object/search/exec.go | 4 +- pkg/services/object/search/prm.go | 16 +- pkg/services/object/search/util.go | 26 +- pkg/services/object/util/prm.go | 105 ++---- 15 files changed, 511 insertions(+), 204 deletions(-) create mode 100644 pkg/services/object/internal/client/client.go create mode 100644 pkg/services/object/internal/client/doc.go diff --git a/pkg/services/object/delete/exec.go b/pkg/services/object/delete/exec.go index 0b641ae9d..07bea7123 100644 --- a/pkg/services/object/delete/exec.go +++ b/pkg/services/object/delete/exec.go @@ -60,11 +60,11 @@ func (exec execCtx) isLocal() bool { } func (exec *execCtx) address() *objectSDK.Address { - return exec.prm.Address() + return exec.prm.addr } func (exec *execCtx) containerID() *cid.ID { - return exec.prm.Address().ContainerID() + return exec.prm.addr.ContainerID() } func (exec *execCtx) commonParameters() *util.CommonPrm { @@ -264,7 +264,7 @@ func (exec *execCtx) saveTombstone() bool { exec.status = statusOK exec.err = nil - exec.prm.TombstoneAddressTarget(). + exec.prm.tombAddrWriter. SetAddress(exec.newAddress(id)) } diff --git a/pkg/services/object/delete/prm.go b/pkg/services/object/delete/prm.go index 227fd345b..a75ca4608 100644 --- a/pkg/services/object/delete/prm.go +++ b/pkg/services/object/delete/prm.go @@ -1,18 +1,35 @@ package deletesvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) +// TombstoneAddressWriter is an interface of tombstone address setter. +type TombstoneAddressWriter interface { + SetAddress(*object.Address) +} + // Prm groups parameters of Delete service call. type Prm struct { common *util.CommonPrm - client.DeleteObjectParams + addr *object.Address + + tombAddrWriter TombstoneAddressWriter } // SetCommonParameters sets common parameters of the operation. func (p *Prm) SetCommonParameters(common *util.CommonPrm) { p.common = common } + +// WithAddress sets address of the object to be removed. +func (p *Prm) WithAddress(addr *object.Address) { + p.addr = addr +} + +// WithTombstoneAddressTarget sets tombstone address destination. +func (p *Prm) WithTombstoneAddressTarget(w TombstoneAddressWriter) { + p.tombAddrWriter = w +} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 34855636b..e97d7f09c 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -5,12 +5,10 @@ import ( "crypto/ecdsa" "errors" - "github.com/nspcc-dev/neofs-api-go/pkg/client" cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/util/logger" "go.uber.org/zap" @@ -92,11 +90,11 @@ func (exec execCtx) isLocal() bool { } func (exec execCtx) isRaw() bool { - return exec.prm.RawFlag() + return exec.prm.raw } func (exec execCtx) address() *objectSDK.Address { - return exec.prm.Address() + return exec.prm.addr } func (exec execCtx) isChild(obj *object.Object) bool { @@ -108,23 +106,6 @@ func (exec execCtx) key() (*ecdsa.PrivateKey, error) { return exec.svc.keyStore.GetKey(exec.prm.common.SessionToken()) } -func (exec execCtx) callOptions() ([]client.CallOption, error) { - key, err := exec.key() - if err != nil { - return nil, err - } - - return exec.prm.common.RemoteCallOptions( - util.WithNetmapEpoch(exec.curProcEpoch), - util.WithKey(key)), nil -} - -func (exec execCtx) remotePrm() *client.GetObjectParams { - return new(client.GetObjectParams). - WithAddress(exec.prm.Address()). - WithRawFlag(exec.prm.RawFlag()) -} - func (exec *execCtx) canAssemble() bool { return exec.svc.assembly && !exec.isRaw() && !exec.headOnly() } @@ -207,7 +188,7 @@ func (exec *execCtx) getChild(id *objectSDK.ID, rng *objectSDK.Range, withHdr bo addr.SetContainerID(exec.address().ContainerID()) addr.SetObjectID(id) - p.WithAddress(addr) + p.addr = addr exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng)) @@ -231,7 +212,7 @@ func (exec *execCtx) headChild(id *objectSDK.ID) (*object.Object, bool) { p := exec.prm p.common = p.common.WithLocalOnly(false) - p.WithAddress(childAddr) + p.addr = childAddr prm := HeadPrm{ commonPrm: p.commonPrm, diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index 10c53d5c7..57aa81d1d 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -3,7 +3,6 @@ package getsvc import ( "hash" - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -45,7 +44,9 @@ type commonPrm struct { common *util.CommonPrm - client.GetObjectParams + addr *objectSDK.Address + + raw bool forwarder RequestForwarder } @@ -109,6 +110,16 @@ func (p *commonPrm) SetRequestForwarder(f RequestForwarder) { p.forwarder = f } +// WithAddress sets object address to be read. +func (p *commonPrm) WithAddress(addr *objectSDK.Address) { + p.addr = addr +} + +// WithRawFlag sets flag of raw reading. +func (p *commonPrm) WithRawFlag(raw bool) { + p.raw = raw +} + // SetHeaderWriter sets target component to write the object header. func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) { p.objWriter = &partWriter{ diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index bc10f60b5..b1e858589 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -3,12 +3,13 @@ package getsvc import ( "io" - "github.com/nspcc-dev/neofs-api-go/pkg/client" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + internal "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" + internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" ) type SimpleObjectWriter struct { @@ -88,40 +89,85 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj return exec.prm.forwarder(info, c.client) } - opts, err := exec.callOptions() + key, err := exec.key() if err != nil { return nil, err } if exec.headOnly() { - return c.client.GetObjectHeader(exec.context(), - new(client.ObjectHeaderParams). - WithAddress(exec.address()). - WithRawFlag(exec.isRaw()), - opts..., - ) - } - // we don't specify payload writer because we accumulate - // the object locally (even huge). - if rng := exec.ctxRange(); rng != nil { - data, err := c.client.ObjectPayloadRangeData(exec.context(), - new(client.RangeDataParams). - WithAddress(exec.address()). - WithRange(rng). - WithRaw(exec.isRaw()), - opts..., - ) + var prm internalclient.HeadObjectPrm + + prm.SetContext(exec.context()) + prm.SetClient(c.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.HeadObject(prm) if err != nil { return nil, err } - return payloadOnlyObject(data), nil + return res.Header(), nil + } + // we don't specify payload writer because we accumulate + // the object locally (even huge). + if rng := exec.ctxRange(); rng != nil { + var prm internalclient.PayloadRangePrm + + prm.SetContext(exec.context()) + prm.SetClient(c.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetRange(rng) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internalclient.PayloadRange(prm) + if err != nil { + return nil, err + } + + return payloadOnlyObject(res.PayloadRange()), nil } - return c.client.GetObject(exec.context(), - exec.remotePrm(), - opts..., - ) + var prm internalclient.GetObjectPrm + + prm.SetContext(exec.context()) + prm.SetClient(c.client) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetAddress(exec.address()) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + + if exec.isRaw() { + prm.SetRawFlag() + } + + res, err := internal.GetObject(prm) + if err != nil { + return nil, err + } + + return res.Object(), nil } func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { diff --git a/pkg/services/object/get/v2/util.go b/pkg/services/object/get/v2/util.go index 9085d590c..a72365068 100644 --- a/pkg/services/object/get/v2/util.go +++ b/pkg/services/object/get/v2/util.go @@ -321,7 +321,10 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp p.SetCommonParameters(commonPrm) body := req.GetBody() - p.WithAddress(objectSDK.NewAddressFromV2(body.GetAddress())) + + objAddr := objectSDK.NewAddressFromV2(body.GetAddress()) + + p.WithAddress(objAddr) p.WithRawFlag(body.GetRaw()) p.SetHeaderWriter(&headResponseWriter{ mainOnly: body.GetMainOnly(), @@ -417,7 +420,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp if err := signature2.VerifyDataWithSource( signature.StableMarshalerWrapper{ - SM: p.Address().ObjectID().ToV2(), + SM: objAddr.ObjectID().ToV2(), }, func() (key, sig []byte) { return idSig.GetKey(), idSig.GetSign() @@ -436,7 +439,7 @@ func (s *Service) toHeadPrm(ctx context.Context, req *objectV2.HeadRequest, resp obj.SetSignature(idSig) raw := object.NewRawFromV2(obj) - raw.SetID(p.Address().ObjectID()) + raw.SetID(objAddr.ObjectID()) // convert the object return raw.Object().SDK(), nil diff --git a/pkg/services/object/head/prm.go b/pkg/services/object/head/prm.go index 7a7e62d1f..6a0f75641 100644 --- a/pkg/services/object/head/prm.go +++ b/pkg/services/object/head/prm.go @@ -2,33 +2,10 @@ package headsvc import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) type Prm struct { - common *util.CommonPrm - - short bool - addr *object.Address - - raw bool -} - -func (p *Prm) WithCommonPrm(v *util.CommonPrm) *Prm { - if p != nil { - p.common = v - } - - return p -} - -func (p *Prm) Short(v bool) *Prm { - if p != nil { - p.short = v - } - - return p } func (p *Prm) WithAddress(v *object.Address) *Prm { @@ -38,11 +15,3 @@ func (p *Prm) WithAddress(v *object.Address) *Prm { return p } - -func (p *Prm) WithRaw(v bool) *Prm { - if p != nil { - p.raw = v - } - - return p -} diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index 390b175eb..6204c0023 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -10,6 +10,7 @@ import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" ) @@ -62,7 +63,7 @@ func (p *RemoteHeadPrm) WithObjectAddress(v *objectSDK.Address) *RemoteHeadPrm { // Head requests object header from the remote node. func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Object, error) { - key, err := h.keyStorage.GetKey(prm.commonHeadPrm.common.SessionToken()) + key, err := h.keyStorage.GetKey(nil) if err != nil { return nil, fmt.Errorf("(%T) could not receive private key: %w", h, err) } @@ -79,23 +80,18 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", h, info.AddressGroup(), err) } - p := new(client.ObjectHeaderParams). - WithAddress(prm.commonHeadPrm.addr). - WithRawFlag(prm.commonHeadPrm.raw) + var headPrm internalclient.HeadObjectPrm - if prm.commonHeadPrm.short { - p = p.WithMainFields() - } + headPrm.SetContext(ctx) + headPrm.SetClient(c) + headPrm.SetPrivateKey(key) + headPrm.SetAddress(prm.commonHeadPrm.addr) + headPrm.SetTTL(1) // FIXME: use constant - hdr, err := c.GetObjectHeader(ctx, p, - client.WithTTL(1), // FIXME: use constant - client.WithSession(prm.commonHeadPrm.common.SessionToken()), - client.WithBearer(prm.commonHeadPrm.common.BearerToken()), - client.WithKey(key), - ) + res, err := internalclient.HeadObject(headPrm) if err != nil { return nil, fmt.Errorf("(%T) could not head object in %s: %w", h, info.AddressGroup(), err) } - return object.NewFromSDK(hdr), nil + return object.NewFromSDK(res.Header()), nil } diff --git a/pkg/services/object/internal/client/client.go b/pkg/services/object/internal/client/client.go new file mode 100644 index 000000000..f750735fa --- /dev/null +++ b/pkg/services/object/internal/client/client.go @@ -0,0 +1,301 @@ +package internal + +import ( + "context" + "crypto/ecdsa" + "strconv" + + "github.com/nspcc-dev/neofs-api-go/pkg" + "github.com/nspcc-dev/neofs-api-go/pkg/client" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/session" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + session2 "github.com/nspcc-dev/neofs-api-go/v2/session" +) + +type commonPrm struct { + cli client.Client + + ctx context.Context + + opts []client.CallOption +} + +// SetClient sets base client for NeoFS API communication. +// +// Required parameter. +func (x *commonPrm) SetClient(cli client.Client) { + x.cli = cli +} + +// SetContext sets context.Context for network communication. +// +// Required parameter. +func (x *commonPrm) SetContext(ctx context.Context) { + x.ctx = ctx +} + +// SetPrivateKey sets private key to sign the request(s). +// +// Required parameter. +func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) { + x.opts = append(x.opts, client.WithKey(key)) +} + +// SetSessionToken sets token of the session within which request should be sent. +// +// By default the request will be sent outside the session. +func (x *commonPrm) SetSessionToken(tok *session.Token) { + x.opts = append(x.opts, client.WithSession(tok)) +} + +// SetBearerToken sets bearer token to be attached to the request. +// +// By default token is not attached to the request. +func (x *commonPrm) SetBearerToken(tok *token.BearerToken) { + x.opts = append(x.opts, client.WithBearer(tok)) +} + +// SetXHeaders sets request X-Headers. +// +// By default X-Headers will not be attached to the request. +func (x *commonPrm) SetXHeaders(xhdrs []*pkg.XHeader) { + for _, xhdr := range xhdrs { + x.opts = append(x.opts, client.WithXHeader(xhdr)) + } +} + +type readPrmCommon struct { + commonPrm + + ttl uint32 +} + +// SetNetmapEpoch sets the epoch number to be used to locate the object. +// +// By default current epoch on the server will be used. +func (x *readPrmCommon) SetNetmapEpoch(epoch uint64) { + xNetmapEpoch := pkg.NewXHeader() + xNetmapEpoch.SetKey(session2.XHeaderNetmapEpoch) + xNetmapEpoch.SetValue(strconv.FormatUint(epoch, 10)) + + x.opts = append(x.opts, client.WithXHeader(xNetmapEpoch)) +} + +// SetTTL sets request TTL value. +// +// Required parameter. +func (x *readPrmCommon) SetTTL(ttl uint32) { + x.opts = append(x.opts, client.WithTTL(ttl)) +} + +// GetObjectPrm groups parameters of GetObject operation. +type GetObjectPrm struct { + readPrmCommon + + cliPrm client.GetObjectParams +} + +// SetRawFlag sets raw flag of the request. +// +// By default request will not be raw. +func (x *GetObjectPrm) SetRawFlag() { + x.cliPrm.WithRawFlag(true) +} + +// SetAddress sets object address. +// +// Required parameter. +func (x *GetObjectPrm) SetAddress(addr *object.Address) { + x.cliPrm.WithAddress(addr) +} + +// GetObjectRes groups resulting values of GetObject operation. +type GetObjectRes struct { + cliRes *object.Object +} + +// Object returns requested object. +func (x GetObjectRes) Object() *object.Object { + return x.cliRes +} + +// GetObject reads the object by address. +// +// Client, context and key must be set. +// +// Returns: +// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; +// object.ErrAlreadyRemoved error if requested object is marked to be removed. +func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) { + res.cliRes, err = prm.cli.GetObject(prm.ctx, &prm.cliPrm, prm.opts...) + + return +} + +// HeadObjectPrm groups parameters of HeadObject operation. +type HeadObjectPrm struct { + readPrmCommon + + cliPrm client.ObjectHeaderParams +} + +// SetRawFlag sets raw flag of the request. +// +// By default request will not be raw. +func (x *HeadObjectPrm) SetRawFlag() { + x.cliPrm.WithRawFlag(true) +} + +// SetAddress sets object address. +// +// Required parameter. +func (x *HeadObjectPrm) SetAddress(addr *object.Address) { + x.cliPrm.WithAddress(addr) +} + +// GetObjectRes groups resulting values of GetObject operation. +type HeadObjectRes struct { + cliRes *object.Object +} + +// Header returns requested object header. +func (x HeadObjectRes) Header() *object.Object { + return x.cliRes +} + +// HeadObject reads object header by address. +// +// Client, context and key must be set. +// +// Returns: +// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; +// object.ErrAlreadyRemoved error if requested object is marked to be removed. +func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) { + res.cliRes, err = prm.cli.GetObjectHeader(prm.ctx, &prm.cliPrm, prm.opts...) + + return +} + +// PayloadRangePrm groups parameters of PayloadRange operation. +type PayloadRangePrm struct { + readPrmCommon + + cliPrm client.RangeDataParams +} + +// SetRawFlag sets raw flag of the request. +// +// By default request will not be raw. +func (x *PayloadRangePrm) SetRawFlag() { + x.cliPrm.WithRaw(true) +} + +// SetAddress sets object address. +// +// Required parameter. +func (x *PayloadRangePrm) SetAddress(addr *object.Address) { + x.cliPrm.WithAddress(addr) +} + +// SetRange range of the object payload to be read. +// +// Required parameter. +func (x *PayloadRangePrm) SetRange(rng *object.Range) { + x.cliPrm.WithRange(rng) +} + +// PayloadRangeRes groups resulting values of GetObject operation. +type PayloadRangeRes struct { + cliRes []byte +} + +// PayloadRange returns data of the requested payload range. +func (x PayloadRangeRes) PayloadRange() []byte { + return x.cliRes +} + +// PayloadRange reads object payload range by address. +// +// Client, context and key must be set. +// +// Returns: +// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual; +// object.ErrAlreadyRemoved error if requested object is marked to be removed. +func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) { + res.cliRes, err = prm.cli.ObjectPayloadRangeData(prm.ctx, &prm.cliPrm, prm.opts...) + + return +} + +// PutObjectPrm groups parameters of PutObject operation. +type PutObjectPrm struct { + commonPrm + + cliPrm client.PutObjectParams +} + +// SetObject sets object to be stored. +// +// Required parameter. +func (x *PutObjectPrm) SetObject(obj *object.Object) { + x.cliPrm.WithObject(obj) +} + +// PutObjectRes groups resulting values of PutObject operation. +type PutObjectRes struct { + cliRes *object.ID +} + +// ID returns identifier of the stored object. +func (x PutObjectRes) ID() *object.ID { + return x.cliRes +} + +// PutObject saves the object in local storage of the remote node. +// +// Client, context and key must be set. +func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) { + res.cliRes, err = prm.cli.PutObject(prm.ctx, &prm.cliPrm, + append(prm.opts, client.WithTTL(1))..., + ) + + return +} + +// SearchObjectsPrm groups parameters of SearchObjects operation. +type SearchObjectsPrm struct { + readPrmCommon + + cliPrm client.SearchObjectParams +} + +// SetContainerID sets identifier of the container to search the objects. +// +// Required parameter. +func (x *SearchObjectsPrm) SetContainerID(id *cid.ID) { + x.cliPrm.WithContainerID(id) +} + +// SetFilters sets search filters. +func (x *SearchObjectsPrm) SetFilters(fs object.SearchFilters) { + x.cliPrm.WithSearchFilters(fs) +} + +// SearchObjectsRes groups resulting values of SearchObjects operation. +type SearchObjectsRes struct { + cliRes []*object.ID +} + +// IDList returns identifiers of the matched objects. +func (x SearchObjectsRes) IDList() []*object.ID { + return x.cliRes +} + +// SearchObjects selects objects from container which match the filters. +func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) { + res.cliRes, err = prm.cli.SearchObject(prm.ctx, &prm.cliPrm, prm.opts...) + + return +} diff --git a/pkg/services/object/internal/client/doc.go b/pkg/services/object/internal/client/doc.go new file mode 100644 index 000000000..f7871771d --- /dev/null +++ b/pkg/services/object/internal/client/doc.go @@ -0,0 +1,11 @@ +// Package internal provides functionality for NeoFS Node Object service communication with NeoFS network. +// The base client for accessing remote nodes via NeoFS API is a NeoFS SDK Go API client. +// However, although it encapsulates a useful piece of business logic (e.g. the signature mechanism), +// the Object service does not fully use the client's flexible interface. +// +// In this regard, this package provides functions over base API client necessary for the application. +// This allows you to concentrate the entire spectrum of the client's use in one place (this will be convenient +// both when updating the base client and for evaluating the UX of SDK library). So it is expected that all +// Object service packages will be limited to this package for the development of functionality requiring +// NeoFS API communication. +package internal diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index fa4e7bcac..adaac9b7d 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -4,10 +4,10 @@ import ( "context" "fmt" - "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/netmap" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer" ) @@ -60,22 +60,23 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) } - id, err := c.PutObject(t.ctx, new(client.PutObjectParams). - WithObject( - t.obj.SDK(), - ), - append( - t.commonPrm.RemoteCallOptions(), - client.WithTTL(1), // FIXME: use constant - client.WithKey(key), - )..., - ) + var prm internalclient.PutObjectPrm + + prm.SetContext(t.ctx) + prm.SetClient(c) + prm.SetPrivateKey(key) + prm.SetSessionToken(t.commonPrm.SessionToken()) + prm.SetBearerToken(t.commonPrm.BearerToken()) + prm.SetXHeaders(t.commonPrm.XHeaders()) + prm.SetObject(t.obj.SDK()) + + res, err := internalclient.PutObject(prm) if err != nil { return nil, fmt.Errorf("(%T) could not put object to %s: %w", t, t.nodeInfo.AddressGroup(), err) } return new(transformer.AccessIdentifiers). - WithSelfID(id), nil + WithSelfID(res.ID()), nil } // NewRemoteSender creates, initializes and returns new RemoteSender instance. diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 4697f7ec3..830739e83 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -60,11 +60,11 @@ func (exec execCtx) isLocal() bool { } func (exec *execCtx) containerID() *cid.ID { - return exec.prm.ContainerID() + return exec.prm.cid } func (exec *execCtx) searchFilters() objectSDK.SearchFilters { - return exec.prm.SearchFilters() + return exec.prm.filters } func (exec *execCtx) netmapEpoch() uint64 { diff --git a/pkg/services/object/search/prm.go b/pkg/services/object/search/prm.go index 55146c29d..49e19c211 100644 --- a/pkg/services/object/search/prm.go +++ b/pkg/services/object/search/prm.go @@ -1,7 +1,7 @@ package searchsvc import ( - "github.com/nspcc-dev/neofs-api-go/pkg/client" + cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -13,7 +13,9 @@ type Prm struct { common *util.CommonPrm - client.SearchObjectParams + cid *cid.ID + + filters objectSDK.SearchFilters forwarder RequestForwarder } @@ -43,3 +45,13 @@ func (p *Prm) SetWriter(w IDListWriter) { func (p *Prm) SetRequestForwarder(f RequestForwarder) { p.forwarder = f } + +// WithContainerID sets identifier of the container to search the objects. +func (p *Prm) WithContainerID(id *cid.ID) { + p.cid = id +} + +// WithSearchFilters sets search filters. +func (p *Prm) WithSearchFilters(fs objectSDK.SearchFilters) { + p.filters = fs +} diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index 1513f912a..133e46469 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" + internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" ) @@ -88,12 +89,25 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]*o return nil, err } - return c.client.SearchObject(exec.context(), - &exec.prm.SearchObjectParams, - exec.prm.common.RemoteCallOptions( - util.WithNetmapEpoch(exec.curProcEpoch), - util.WithKey(key), - )...) + var prm internalclient.SearchObjectsPrm + + prm.SetContext(exec.context()) + prm.SetClient(c.client) + prm.SetPrivateKey(key) + prm.SetSessionToken(exec.prm.common.SessionToken()) + prm.SetBearerToken(exec.prm.common.BearerToken()) + prm.SetTTL(exec.prm.common.TTL()) + prm.SetXHeaders(exec.prm.common.XHeaders()) + prm.SetNetmapEpoch(exec.curProcEpoch) + prm.SetContainerID(exec.containerID()) + prm.SetFilters(exec.searchFilters()) + + res, err := internalclient.SearchObjects(prm) + if err != nil { + return nil, err + } + + return res.IDList(), nil } func (e *storageEngineWrapper) search(exec *execCtx) ([]*objectSDK.ID, error) { diff --git a/pkg/services/object/util/prm.go b/pkg/services/object/util/prm.go index cd0fb95e9..bb250e845 100644 --- a/pkg/services/object/util/prm.go +++ b/pkg/services/object/util/prm.go @@ -1,11 +1,9 @@ package util import ( - "crypto/ecdsa" "strconv" "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/client" sessionsdk "github.com/nspcc-dev/neofs-api-go/pkg/session" "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-api-go/v2/session" @@ -20,14 +18,28 @@ type CommonPrm struct { bearer *token.BearerToken - callOpts []client.CallOption + ttl uint32 + + xhdrs []*pkg.XHeader } -type remoteCallOpts struct { - opts []client.CallOption +// TTL returns TTL for new requests. +func (p *CommonPrm) TTL() uint32 { + if p != nil { + return p.ttl + } + + return 0 } -type DynamicCallOption func(*remoteCallOpts) +// Returns X-Headers for new requests. +func (p *CommonPrm) XHeaders() []*pkg.XHeader { + if p != nil { + return p.xhdrs + } + + return nil +} func (p *CommonPrm) WithLocalOnly(v bool) *CommonPrm { if p != nil { @@ -45,65 +57,6 @@ func (p *CommonPrm) LocalOnly() bool { return false } -func (p *CommonPrm) WithSessionToken(token *sessionsdk.Token) *CommonPrm { - if p != nil { - p.token = token - } - - return p -} - -func (p *CommonPrm) WithBearerToken(token *token.BearerToken) *CommonPrm { - if p != nil { - p.bearer = token - } - - return p -} - -// WithRemoteCallOptions sets call options remote remote client calls. -func (p *CommonPrm) WithRemoteCallOptions(opts ...client.CallOption) *CommonPrm { - if p != nil { - p.callOpts = opts - } - - return p -} - -// RemoteCallOptions return call options for remote client calls. -func (p *CommonPrm) RemoteCallOptions(dynamic ...DynamicCallOption) []client.CallOption { - if p != nil { - o := &remoteCallOpts{ - opts: p.callOpts, - } - - for _, applier := range dynamic { - applier(o) - } - - return o.opts - } - - return nil -} - -func WithNetmapEpoch(v uint64) DynamicCallOption { - return func(o *remoteCallOpts) { - xHdr := pkg.NewXHeader() - xHdr.SetKey(session.XHeaderNetmapEpoch) - xHdr.SetValue(strconv.FormatUint(v, 10)) - - o.opts = append(o.opts, client.WithXHeader(xHdr)) - } -} - -// WithKey sets key to use for the request. -func WithKey(key *ecdsa.PrivateKey) DynamicCallOption { - return func(o *remoteCallOpts) { - o.opts = append(o.opts, client.WithKey(key)) - } -} - func (p *CommonPrm) SessionToken() *sessionsdk.Token { if p != nil { return p.token @@ -148,26 +101,20 @@ func CommonPrmFromV2(req interface { meta := req.GetMetaHeader() xHdrs := meta.GetXHeaders() - - const staticOptNum = 3 + ttl := meta.GetTTL() prm := &CommonPrm{ - local: meta.GetTTL() <= 1, // FIXME: use constant - token: nil, - bearer: nil, - callOpts: make([]client.CallOption, 0, staticOptNum+len(xHdrs)), + local: ttl <= 1, // FIXME: use constant + xhdrs: make([]*pkg.XHeader, 0, len(xHdrs)), + ttl: ttl - 1, // decrease TTL for new requests } - prm.callOpts = append(prm.callOpts, client.WithTTL(meta.GetTTL()-1)) - if tok := meta.GetSessionToken(); tok != nil { prm.token = sessionsdk.NewTokenFromV2(tok) - prm.callOpts = append(prm.callOpts, client.WithSession(prm.token)) } if tok := meta.GetBearerToken(); tok != nil { prm.bearer = token.NewBearerTokenFromV2(tok) - prm.callOpts = append(prm.callOpts, client.WithBearer(prm.bearer)) } for i := range xHdrs { @@ -187,11 +134,9 @@ func CommonPrmFromV2(req interface { return nil, err } default: - prm.callOpts = append(prm.callOpts, - client.WithXHeader( - pkg.NewXHeaderFromV2(xHdrs[i]), - ), - ) + xhdr := pkg.NewXHeaderFromV2(xHdrs[i]) + + prm.xhdrs = append(prm.xhdrs, xhdr) } }