forked from TrueCloudLab/frostfs-node
[#1195] Adopt recent changes in NeoFS SDK
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
a8d10704d5
commit
e0dce1043a
25 changed files with 885 additions and 424 deletions
|
@ -3,9 +3,10 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"strconv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
session2 "github.com/nspcc-dev/neofs-api-go/v2/session"
|
||||
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/client"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
|
@ -22,7 +23,13 @@ type commonPrm struct {
|
|||
|
||||
ctx context.Context
|
||||
|
||||
opts []client.CallOption
|
||||
key *ecdsa.PrivateKey
|
||||
|
||||
tokenSession *session.Token
|
||||
|
||||
tokenBearer *token.BearerToken
|
||||
|
||||
local bool
|
||||
}
|
||||
|
||||
// SetClient sets base client for NeoFS API communication.
|
||||
|
@ -43,35 +50,33 @@ func (x *commonPrm) SetContext(ctx context.Context) {
|
|||
//
|
||||
// Required parameter.
|
||||
func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) {
|
||||
x.opts = append(x.opts, client.WithKey(key))
|
||||
x.key = key
|
||||
}
|
||||
|
||||
// SetSessionToken sets token of the session within which request should be sent.
|
||||
//
|
||||
// By default the request will be sent outside the session.
|
||||
func (x *commonPrm) SetSessionToken(tok *session.Token) {
|
||||
x.opts = append(x.opts, client.WithSession(tok))
|
||||
x.tokenSession = tok
|
||||
}
|
||||
|
||||
// SetBearerToken sets bearer token to be attached to the request.
|
||||
//
|
||||
// By default token is not attached to the request.
|
||||
func (x *commonPrm) SetBearerToken(tok *token.BearerToken) {
|
||||
x.opts = append(x.opts, client.WithBearer(tok))
|
||||
x.tokenBearer = tok
|
||||
}
|
||||
|
||||
// SetTTL sets time-to-live call option.
|
||||
func (x *commonPrm) SetTTL(ttl uint32) {
|
||||
x.opts = append(x.opts, client.WithTTL(ttl))
|
||||
x.local = ttl < 2
|
||||
}
|
||||
|
||||
// SetXHeaders sets request X-Headers.
|
||||
//
|
||||
// By default X-Headers will not be attached to the request.
|
||||
func (x *commonPrm) SetXHeaders(xhdrs []*session.XHeader) {
|
||||
for _, xhdr := range xhdrs {
|
||||
x.opts = append(x.opts, client.WithXHeader(xhdr))
|
||||
}
|
||||
func (x *commonPrm) SetXHeaders(_ []*session.XHeader) {
|
||||
// FIXME: (neofs-node#1194) not supported by client
|
||||
}
|
||||
|
||||
type readPrmCommon struct {
|
||||
|
@ -81,43 +86,45 @@ type readPrmCommon struct {
|
|||
// SetNetmapEpoch sets the epoch number to be used to locate the object.
|
||||
//
|
||||
// By default current epoch on the server will be used.
|
||||
func (x *readPrmCommon) SetNetmapEpoch(epoch uint64) {
|
||||
xNetmapEpoch := session.NewXHeader()
|
||||
xNetmapEpoch.SetKey(session2.XHeaderNetmapEpoch)
|
||||
xNetmapEpoch.SetValue(strconv.FormatUint(epoch, 10))
|
||||
|
||||
x.opts = append(x.opts, client.WithXHeader(xNetmapEpoch))
|
||||
func (x *readPrmCommon) SetNetmapEpoch(_ uint64) {
|
||||
// FIXME: (neofs-node#1194) not supported by client
|
||||
}
|
||||
|
||||
// GetObjectPrm groups parameters of GetObject operation.
|
||||
type GetObjectPrm struct {
|
||||
readPrmCommon
|
||||
|
||||
cliPrm client.GetObjectParams
|
||||
cliPrm client.PrmObjectGet
|
||||
}
|
||||
|
||||
// SetRawFlag sets raw flag of the request.
|
||||
//
|
||||
// By default request will not be raw.
|
||||
func (x *GetObjectPrm) SetRawFlag() {
|
||||
x.cliPrm.WithRawFlag(true)
|
||||
x.cliPrm.MarkRaw()
|
||||
}
|
||||
|
||||
// SetAddress sets object address.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *GetObjectPrm) SetAddress(addr *addressSDK.Address) {
|
||||
x.cliPrm.WithAddress(addr)
|
||||
if id := addr.ContainerID(); id != nil {
|
||||
x.cliPrm.FromContainer(*id)
|
||||
}
|
||||
|
||||
if id := addr.ObjectID(); id != nil {
|
||||
x.cliPrm.ByID(*id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetObjectRes groups resulting values of GetObject operation.
|
||||
type GetObjectRes struct {
|
||||
cliRes *client.ObjectGetRes
|
||||
obj *object.Object
|
||||
}
|
||||
|
||||
// Object returns requested object.
|
||||
func (x GetObjectRes) Object() *object.Object {
|
||||
return x.cliRes.Object()
|
||||
return x.obj
|
||||
}
|
||||
|
||||
// GetObject reads the object by address.
|
||||
|
@ -128,47 +135,91 @@ func (x GetObjectRes) Object() *object.Object {
|
|||
// Returns:
|
||||
// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual;
|
||||
// object.ErrAlreadyRemoved error if requested object is marked to be removed.
|
||||
func GetObject(prm GetObjectPrm) (res GetObjectRes, err error) {
|
||||
res.cliRes, err = prm.cli.GetObject(prm.ctx, &prm.cliPrm, prm.opts...)
|
||||
func GetObject(prm GetObjectPrm) (*GetObjectRes, error) {
|
||||
if prm.tokenSession != nil {
|
||||
prm.cliPrm.WithinSession(*prm.tokenSession)
|
||||
}
|
||||
|
||||
if prm.tokenBearer != nil {
|
||||
prm.cliPrm.WithBearerToken(*prm.tokenBearer)
|
||||
}
|
||||
|
||||
if prm.local {
|
||||
prm.cliPrm.MarkLocal()
|
||||
}
|
||||
|
||||
rdr, err := prm.cli.ObjectGetInit(prm.ctx, prm.cliPrm)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.cliRes.Status())
|
||||
return nil, fmt.Errorf("init object reading: %w", err)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
rdr.UseKey(*prm.key)
|
||||
}
|
||||
|
||||
var obj object.Object
|
||||
|
||||
if !rdr.ReadHeader(&obj) {
|
||||
res, err := rdr.Close()
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.Status())
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("read object header: %w", err)
|
||||
}
|
||||
|
||||
buf := make([]byte, obj.PayloadSize())
|
||||
|
||||
_, err = rdr.Read(buf)
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
return nil, fmt.Errorf("read payload: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: #1158 object.ErrAlreadyRemoved never returns
|
||||
|
||||
return
|
||||
object.NewRawFrom(&obj).SetPayload(buf)
|
||||
|
||||
return &GetObjectRes{
|
||||
obj: &obj,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HeadObjectPrm groups parameters of HeadObject operation.
|
||||
type HeadObjectPrm struct {
|
||||
readPrmCommon
|
||||
|
||||
cliPrm client.ObjectHeaderParams
|
||||
cliPrm client.PrmObjectHead
|
||||
}
|
||||
|
||||
// SetRawFlag sets raw flag of the request.
|
||||
//
|
||||
// By default request will not be raw.
|
||||
func (x *HeadObjectPrm) SetRawFlag() {
|
||||
x.cliPrm.WithRawFlag(true)
|
||||
x.cliPrm.MarkRaw()
|
||||
}
|
||||
|
||||
// SetAddress sets object address.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *HeadObjectPrm) SetAddress(addr *addressSDK.Address) {
|
||||
x.cliPrm.WithAddress(addr)
|
||||
if id := addr.ContainerID(); id != nil {
|
||||
x.cliPrm.FromContainer(*id)
|
||||
}
|
||||
|
||||
if id := addr.ObjectID(); id != nil {
|
||||
x.cliPrm.ByID(*id)
|
||||
}
|
||||
}
|
||||
|
||||
// GetObjectRes groups resulting values of GetObject operation.
|
||||
// HeadObjectRes groups resulting values of GetObject operation.
|
||||
type HeadObjectRes struct {
|
||||
cliRes *client.ObjectHeadRes
|
||||
hdr *object.Object
|
||||
}
|
||||
|
||||
// Header returns requested object header.
|
||||
func (x HeadObjectRes) Header() *object.Object {
|
||||
return x.cliRes.Object()
|
||||
return x.hdr
|
||||
}
|
||||
|
||||
// HeadObject reads object header by address.
|
||||
|
@ -179,54 +230,87 @@ func (x HeadObjectRes) Header() *object.Object {
|
|||
// Returns:
|
||||
// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual;
|
||||
// object.ErrAlreadyRemoved error if requested object is marked to be removed.
|
||||
func HeadObject(prm HeadObjectPrm) (res HeadObjectRes, err error) {
|
||||
res.cliRes, err = prm.cli.HeadObject(prm.ctx, &prm.cliPrm, prm.opts...)
|
||||
func HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) {
|
||||
if prm.local {
|
||||
prm.cliPrm.MarkLocal()
|
||||
}
|
||||
|
||||
if prm.tokenSession != nil {
|
||||
prm.cliPrm.WithinSession(*prm.tokenSession)
|
||||
}
|
||||
|
||||
if prm.tokenBearer != nil {
|
||||
prm.cliPrm.WithBearerToken(*prm.tokenBearer)
|
||||
}
|
||||
|
||||
cliRes, err := prm.cli.ObjectHead(prm.ctx, prm.cliPrm)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.cliRes.Status())
|
||||
err = apistatus.ErrFromStatus(cliRes.Status())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read object header from NeoFS: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: #1158 object.ErrAlreadyRemoved never returns
|
||||
|
||||
return
|
||||
var hdr object.Object
|
||||
|
||||
if !cliRes.ReadHeader(&hdr) {
|
||||
return nil, errors.New("missing object header in the response")
|
||||
}
|
||||
|
||||
return &HeadObjectRes{
|
||||
hdr: &hdr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PayloadRangePrm groups parameters of PayloadRange operation.
|
||||
type PayloadRangePrm struct {
|
||||
readPrmCommon
|
||||
|
||||
cliPrm client.RangeDataParams
|
||||
ln uint64
|
||||
|
||||
cliPrm client.PrmObjectRange
|
||||
}
|
||||
|
||||
// SetRawFlag sets raw flag of the request.
|
||||
//
|
||||
// By default request will not be raw.
|
||||
func (x *PayloadRangePrm) SetRawFlag() {
|
||||
x.cliPrm.WithRaw(true)
|
||||
x.cliPrm.MarkRaw()
|
||||
}
|
||||
|
||||
// SetAddress sets object address.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *PayloadRangePrm) SetAddress(addr *addressSDK.Address) {
|
||||
x.cliPrm.WithAddress(addr)
|
||||
if id := addr.ContainerID(); id != nil {
|
||||
x.cliPrm.FromContainer(*id)
|
||||
}
|
||||
|
||||
if id := addr.ObjectID(); id != nil {
|
||||
x.cliPrm.ByID(*id)
|
||||
}
|
||||
}
|
||||
|
||||
// SetRange range of the object payload to be read.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *PayloadRangePrm) SetRange(rng *object.Range) {
|
||||
x.cliPrm.WithRange(rng)
|
||||
x.cliPrm.SetOffset(rng.GetOffset())
|
||||
x.ln = rng.GetLength()
|
||||
}
|
||||
|
||||
// PayloadRangeRes groups resulting values of GetObject operation.
|
||||
type PayloadRangeRes struct {
|
||||
cliRes *client.ObjectRangeRes
|
||||
data []byte
|
||||
}
|
||||
|
||||
// PayloadRange returns data of the requested payload range.
|
||||
func (x PayloadRangeRes) PayloadRange() []byte {
|
||||
return x.cliRes.Data()
|
||||
return x.data
|
||||
}
|
||||
|
||||
// PayloadRange reads object payload range by address.
|
||||
|
@ -237,40 +321,62 @@ func (x PayloadRangeRes) PayloadRange() []byte {
|
|||
// Returns:
|
||||
// error of type *object.SplitInfoError if object if raw flag is set and requested object is virtual;
|
||||
// object.ErrAlreadyRemoved error if requested object is marked to be removed.
|
||||
func PayloadRange(prm PayloadRangePrm) (res PayloadRangeRes, err error) {
|
||||
res.cliRes, err = prm.cli.ObjectPayloadRangeData(prm.ctx, &prm.cliPrm, prm.opts...)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.cliRes.Status())
|
||||
func PayloadRange(prm PayloadRangePrm) (*PayloadRangeRes, error) {
|
||||
if prm.local {
|
||||
prm.cliPrm.MarkLocal()
|
||||
}
|
||||
|
||||
if prm.tokenSession != nil {
|
||||
prm.cliPrm.WithinSession(*prm.tokenSession)
|
||||
}
|
||||
|
||||
if prm.tokenBearer != nil {
|
||||
prm.cliPrm.WithBearerToken(*prm.tokenBearer)
|
||||
}
|
||||
|
||||
prm.cliPrm.SetLength(prm.ln)
|
||||
|
||||
rdr, err := prm.cli.ObjectRangeInit(prm.ctx, prm.cliPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init payload reading: %w", err)
|
||||
}
|
||||
|
||||
data := make([]byte, prm.ln)
|
||||
|
||||
_, err = io.ReadFull(rdr, data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read payload: %w", err)
|
||||
}
|
||||
|
||||
// FIXME: #1158 object.ErrAlreadyRemoved never returns
|
||||
|
||||
return
|
||||
return &PayloadRangeRes{
|
||||
data: data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PutObjectPrm groups parameters of PutObject operation.
|
||||
type PutObjectPrm struct {
|
||||
commonPrm
|
||||
|
||||
cliPrm client.PutObjectParams
|
||||
obj *object.Object
|
||||
}
|
||||
|
||||
// SetObject sets object to be stored.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *PutObjectPrm) SetObject(obj *object.Object) {
|
||||
x.cliPrm.WithObject(obj)
|
||||
x.obj = obj
|
||||
}
|
||||
|
||||
// PutObjectRes groups resulting values of PutObject operation.
|
||||
type PutObjectRes struct {
|
||||
cliRes *client.ObjectPutRes
|
||||
id *oidSDK.ID
|
||||
}
|
||||
|
||||
// ID returns identifier of the stored object.
|
||||
func (x PutObjectRes) ID() *oidSDK.ID {
|
||||
return x.cliRes.ID()
|
||||
return x.id
|
||||
}
|
||||
|
||||
// PutObject saves the object in local storage of the remote node.
|
||||
|
@ -278,56 +384,137 @@ func (x PutObjectRes) ID() *oidSDK.ID {
|
|||
// Client, context and key must be set.
|
||||
//
|
||||
// Returns any error prevented the operation from completing correctly in error return.
|
||||
func PutObject(prm PutObjectPrm) (res PutObjectRes, err error) {
|
||||
res.cliRes, err = prm.cli.PutObject(prm.ctx, &prm.cliPrm,
|
||||
append(prm.opts, client.WithTTL(1))...,
|
||||
)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.cliRes.Status())
|
||||
func PutObject(prm PutObjectPrm) (*PutObjectRes, error) {
|
||||
var prmCli client.PrmObjectPutInit
|
||||
|
||||
w, err := prm.cli.ObjectPutInit(prm.ctx, prmCli)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init object writing on client: %w", err)
|
||||
}
|
||||
|
||||
return
|
||||
w.MarkLocal()
|
||||
|
||||
if prm.key != nil {
|
||||
w.UseKey(*prm.key)
|
||||
}
|
||||
|
||||
if prm.tokenSession != nil {
|
||||
w.WithinSession(*prm.tokenSession)
|
||||
}
|
||||
|
||||
if prm.tokenBearer != nil {
|
||||
w.WithBearerToken(*prm.tokenBearer)
|
||||
}
|
||||
|
||||
if w.WriteHeader(*prm.obj) {
|
||||
w.WritePayloadChunk(prm.obj.Payload())
|
||||
}
|
||||
|
||||
res, err := w.Close()
|
||||
if err == nil {
|
||||
err = apistatus.ErrFromStatus(res.Status())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("write object via client: %w", err)
|
||||
}
|
||||
|
||||
var id oidSDK.ID
|
||||
if !res.ReadStoredObjectID(&id) {
|
||||
return nil, errors.New("missing identifier in the response")
|
||||
}
|
||||
|
||||
return &PutObjectRes{
|
||||
id: &id,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SearchObjectsPrm groups parameters of SearchObjects operation.
|
||||
type SearchObjectsPrm struct {
|
||||
readPrmCommon
|
||||
|
||||
cliPrm client.SearchObjectParams
|
||||
cliPrm client.PrmObjectSearch
|
||||
}
|
||||
|
||||
// SetContainerID sets identifier of the container to search the objects.
|
||||
//
|
||||
// Required parameter.
|
||||
func (x *SearchObjectsPrm) SetContainerID(id *cid.ID) {
|
||||
x.cliPrm.WithContainerID(id)
|
||||
if id != nil {
|
||||
x.cliPrm.InContainer(*id)
|
||||
}
|
||||
}
|
||||
|
||||
// SetFilters sets search filters.
|
||||
func (x *SearchObjectsPrm) SetFilters(fs object.SearchFilters) {
|
||||
x.cliPrm.WithSearchFilters(fs)
|
||||
x.cliPrm.SetFilters(fs)
|
||||
}
|
||||
|
||||
// SearchObjectsRes groups resulting values of SearchObjects operation.
|
||||
type SearchObjectsRes struct {
|
||||
cliRes *client.ObjectSearchRes
|
||||
ids []*oidSDK.ID
|
||||
}
|
||||
|
||||
// IDList returns identifiers of the matched objects.
|
||||
func (x SearchObjectsRes) IDList() []*oidSDK.ID {
|
||||
return x.cliRes.IDList()
|
||||
return x.ids
|
||||
}
|
||||
|
||||
// SearchObjects selects objects from container which match the filters.
|
||||
//
|
||||
// Returns any error prevented the operation from completing correctly in error return.
|
||||
func SearchObjects(prm SearchObjectsPrm) (res SearchObjectsRes, err error) {
|
||||
res.cliRes, err = prm.cli.SearchObjects(prm.ctx, &prm.cliPrm, prm.opts...)
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.cliRes.Status())
|
||||
func SearchObjects(prm SearchObjectsPrm) (*SearchObjectsRes, error) {
|
||||
if prm.local {
|
||||
prm.cliPrm.MarkLocal()
|
||||
}
|
||||
|
||||
return
|
||||
if prm.tokenSession != nil {
|
||||
prm.cliPrm.WithinSession(*prm.tokenSession)
|
||||
}
|
||||
|
||||
if prm.tokenBearer != nil {
|
||||
prm.cliPrm.WithBearerToken(*prm.tokenBearer)
|
||||
}
|
||||
|
||||
rdr, err := prm.cli.ObjectSearchInit(prm.ctx, prm.cliPrm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init object searching in client: %w", err)
|
||||
}
|
||||
|
||||
if prm.key != nil {
|
||||
rdr.UseKey(*prm.key)
|
||||
}
|
||||
|
||||
buf := make([]oidSDK.ID, 10)
|
||||
var ids []*oidSDK.ID
|
||||
var n int
|
||||
var ok bool
|
||||
|
||||
for {
|
||||
n, ok = rdr.Read(buf)
|
||||
if n > 0 {
|
||||
for i := range buf[:n] {
|
||||
v := buf[i]
|
||||
ids = append(ids, &v)
|
||||
}
|
||||
}
|
||||
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
res, err := rdr.Close()
|
||||
if err == nil {
|
||||
// pull out an error from status
|
||||
err = apistatus.ErrFromStatus(res.Status())
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read object list: %w", err)
|
||||
}
|
||||
|
||||
return &SearchObjectsRes{
|
||||
ids: ids,
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue