forked from TrueCloudLab/frostfs-node
564 lines
14 KiB
Go
564 lines
14 KiB
Go
package internal
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
)
|
|
|
|
type commonPrm struct {
|
|
cli coreclient.Client
|
|
|
|
key *ecdsa.PrivateKey
|
|
|
|
tokenSession *session.Object
|
|
|
|
tokenBearer *bearer.Token
|
|
|
|
local bool
|
|
|
|
xHeaders []string
|
|
}
|
|
|
|
// SetClient sets base client for ForstFS API communication.
|
|
//
|
|
// Required parameter.
|
|
func (x *commonPrm) SetClient(cli coreclient.Client) {
|
|
x.cli = cli
|
|
}
|
|
|
|
// SetPrivateKey sets private key to sign the request(s).
|
|
//
|
|
// Required parameter.
|
|
func (x *commonPrm) SetPrivateKey(key *ecdsa.PrivateKey) {
|
|
x.key = key
|
|
}
|
|
|
|
// SetSessionToken sets token of the session within which request should be sent.
|
|
//
|
|
// By default the request will be sent outside the session.
|
|
func (x *commonPrm) SetSessionToken(tok *session.Object) {
|
|
x.tokenSession = tok
|
|
}
|
|
|
|
// SetBearerToken sets bearer token to be attached to the request.
|
|
//
|
|
// By default token is not attached to the request.
|
|
func (x *commonPrm) SetBearerToken(tok *bearer.Token) {
|
|
x.tokenBearer = tok
|
|
}
|
|
|
|
// SetTTL sets time-to-live call option.
|
|
func (x *commonPrm) SetTTL(ttl uint32) {
|
|
x.local = ttl < 2
|
|
}
|
|
|
|
// SetXHeaders sets request X-Headers.
|
|
//
|
|
// By default X-Headers will not be attached to the request.
|
|
func (x *commonPrm) SetXHeaders(hs []string) {
|
|
x.xHeaders = hs
|
|
}
|
|
|
|
type readPrmCommon struct {
|
|
commonPrm
|
|
}
|
|
|
|
// SetNetmapEpoch sets the epoch number to be used to locate the objectSDK.
|
|
//
|
|
// By default current epoch on the server will be used.
|
|
func (x *readPrmCommon) SetNetmapEpoch(_ uint64) {
|
|
// FIXME(@fyrchik): https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/465
|
|
}
|
|
|
|
// GetObjectPrm groups parameters of GetObject operation.
|
|
type GetObjectPrm struct {
|
|
readPrmCommon
|
|
|
|
ClientParams client.PrmObjectGet
|
|
|
|
obj oid.ID
|
|
}
|
|
|
|
// SetRawFlag sets raw flag of the request.
|
|
//
|
|
// By default request will not be raw.
|
|
func (x *GetObjectPrm) SetRawFlag() {
|
|
x.ClientParams.Raw = true
|
|
}
|
|
|
|
// SetAddress sets object address.
|
|
//
|
|
// Required parameter.
|
|
func (x *GetObjectPrm) SetAddress(addr oid.Address) {
|
|
x.obj = addr.Object()
|
|
cnr := addr.Container()
|
|
|
|
x.ClientParams.ContainerID = &cnr
|
|
x.ClientParams.ObjectID = &x.obj
|
|
}
|
|
|
|
// GetObjectRes groups the resulting values of GetObject operation.
|
|
type GetObjectRes struct {
|
|
obj *objectSDK.Object
|
|
}
|
|
|
|
// Object returns requested objectSDK.
|
|
func (x GetObjectRes) Object() *objectSDK.Object {
|
|
return x.obj
|
|
}
|
|
|
|
// GetObject reads the object by address.
|
|
//
|
|
// Client, context and key must be set.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// Returns:
|
|
// - error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual;
|
|
// - error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed.
|
|
//
|
|
// GetObject ignores the provided session if it is not related to the requested objectSDK.
|
|
func GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) {
|
|
// here we ignore session if it is opened for other object since such
|
|
// request will almost definitely fail. The case can occur, for example,
|
|
// when session is bound to the parent object and child object is requested.
|
|
if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) {
|
|
prm.ClientParams.Session = prm.tokenSession
|
|
}
|
|
|
|
prm.ClientParams.XHeaders = prm.xHeaders
|
|
prm.ClientParams.BearerToken = prm.tokenBearer
|
|
prm.ClientParams.Local = prm.local
|
|
prm.ClientParams.Key = prm.key
|
|
|
|
rdr, err := prm.cli.ObjectGetInit(ctx, prm.ClientParams)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object reading: %w", err)
|
|
}
|
|
|
|
var obj objectSDK.Object
|
|
|
|
if !rdr.ReadHeader(&obj) {
|
|
res, err := rdr.Close()
|
|
if err == nil {
|
|
// pull out an error from status
|
|
err = apistatus.ErrFromStatus(res.Status())
|
|
} else {
|
|
ReportError(prm.cli, err)
|
|
}
|
|
|
|
return nil, fmt.Errorf("read object header: %w", err)
|
|
}
|
|
|
|
buf := make([]byte, obj.PayloadSize())
|
|
|
|
_, err = rdr.Read(buf)
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return nil, fmt.Errorf("read payload: %w", err)
|
|
}
|
|
|
|
obj.SetPayload(buf)
|
|
|
|
return &GetObjectRes{
|
|
obj: &obj,
|
|
}, nil
|
|
}
|
|
|
|
// HeadObjectPrm groups parameters of HeadObject operation.
|
|
type HeadObjectPrm struct {
|
|
readPrmCommon
|
|
|
|
ClientParams client.PrmObjectHead
|
|
|
|
obj oid.ID
|
|
}
|
|
|
|
// SetRawFlag sets raw flag of the request.
|
|
//
|
|
// By default request will not be raw.
|
|
func (x *HeadObjectPrm) SetRawFlag() {
|
|
x.ClientParams.Raw = true
|
|
}
|
|
|
|
// SetAddress sets object address.
|
|
//
|
|
// Required parameter.
|
|
func (x *HeadObjectPrm) SetAddress(addr oid.Address) {
|
|
x.obj = addr.Object()
|
|
cnr := addr.Container()
|
|
|
|
x.ClientParams.ContainerID = &cnr
|
|
x.ClientParams.ObjectID = &x.obj
|
|
}
|
|
|
|
// HeadObjectRes groups the resulting values of GetObject operation.
|
|
type HeadObjectRes struct {
|
|
hdr *objectSDK.Object
|
|
}
|
|
|
|
// Header returns requested object header.
|
|
func (x HeadObjectRes) Header() *objectSDK.Object {
|
|
return x.hdr
|
|
}
|
|
|
|
// HeadObject reads object header by address.
|
|
//
|
|
// Client and key must be set.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// Returns:
|
|
//
|
|
// error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual;
|
|
// error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed.
|
|
//
|
|
// HeadObject ignores the provided session if it is not related to the requested objectSDK.
|
|
func HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) {
|
|
// see details in same statement of GetObject
|
|
if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) {
|
|
prm.ClientParams.Session = prm.tokenSession
|
|
}
|
|
|
|
prm.ClientParams.BearerToken = prm.tokenBearer
|
|
prm.ClientParams.Local = prm.local
|
|
prm.ClientParams.XHeaders = prm.xHeaders
|
|
|
|
cliRes, err := prm.cli.ObjectHead(ctx, prm.ClientParams)
|
|
if err == nil {
|
|
// pull out an error from status
|
|
err = apistatus.ErrFromStatus(cliRes.Status())
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read object header from FrostFS: %w", err)
|
|
}
|
|
|
|
var hdr objectSDK.Object
|
|
|
|
if !cliRes.ReadHeader(&hdr) {
|
|
return nil, errors.New("missing object header in the response")
|
|
}
|
|
|
|
return &HeadObjectRes{
|
|
hdr: &hdr,
|
|
}, nil
|
|
}
|
|
|
|
// PayloadRangePrm groups parameters of PayloadRange operation.
|
|
type PayloadRangePrm struct {
|
|
readPrmCommon
|
|
|
|
ln uint64
|
|
|
|
ClientParams client.PrmObjectRange
|
|
|
|
obj oid.ID
|
|
}
|
|
|
|
// SetRawFlag sets raw flag of the request.
|
|
//
|
|
// By default request will not be raw.
|
|
func (x *PayloadRangePrm) SetRawFlag() {
|
|
x.ClientParams.Raw = true
|
|
}
|
|
|
|
// SetAddress sets object address.
|
|
//
|
|
// Required parameter.
|
|
func (x *PayloadRangePrm) SetAddress(addr oid.Address) {
|
|
x.obj = addr.Object()
|
|
cnr := addr.Container()
|
|
|
|
x.ClientParams.ContainerID = &cnr
|
|
x.ClientParams.ObjectID = &x.obj
|
|
}
|
|
|
|
// SetRange range of the object payload to be read.
|
|
//
|
|
// Required parameter.
|
|
func (x *PayloadRangePrm) SetRange(rng *objectSDK.Range) {
|
|
x.ClientParams.Offset = rng.GetOffset()
|
|
x.ln = rng.GetLength()
|
|
}
|
|
|
|
// PayloadRangeRes groups the resulting values of GetObject operation.
|
|
type PayloadRangeRes struct {
|
|
data []byte
|
|
}
|
|
|
|
// PayloadRange returns data of the requested payload range.
|
|
func (x PayloadRangeRes) PayloadRange() []byte {
|
|
return x.data
|
|
}
|
|
|
|
// maxInitialBufferSize is the maximum initial buffer size for PayloadRange result.
|
|
// We don't want to allocate a lot of space in advance because a query can
|
|
// fail with apistatus.ObjectOutOfRange status.
|
|
const maxInitialBufferSize = 1024 * 1024 // 1 MiB
|
|
|
|
// PayloadRange reads object payload range by address.
|
|
//
|
|
// Client and key must be set.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
// Returns:
|
|
//
|
|
// error of type *objectSDK.SplitInfoError if object raw flag is set and requested object is virtual;
|
|
// error of type *apistatus.ObjectAlreadyRemoved if the requested object is marked to be removed;
|
|
// error of type *apistatus.ObjectOutOfRange if the requested range is too big.
|
|
//
|
|
// PayloadRange ignores the provided session if it is not related to the requested objectSDK.
|
|
func PayloadRange(ctx context.Context, prm PayloadRangePrm) (*PayloadRangeRes, error) {
|
|
// see details in same statement of GetObject
|
|
if prm.tokenSession != nil && prm.tokenSession.AssertObject(prm.obj) {
|
|
prm.ClientParams.Session = prm.tokenSession
|
|
}
|
|
|
|
prm.ClientParams.XHeaders = prm.xHeaders
|
|
prm.ClientParams.BearerToken = prm.tokenBearer
|
|
prm.ClientParams.Local = prm.local
|
|
prm.ClientParams.Length = prm.ln
|
|
|
|
rdr, err := prm.cli.ObjectRangeInit(ctx, prm.ClientParams)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init payload reading: %w", err)
|
|
}
|
|
|
|
if int64(prm.ln) < 0 {
|
|
// `CopyN` expects `int64`, this check ensures that the result is positive.
|
|
// On practice this means that we can return incorrect results for objects
|
|
// with size > 8_388 Petabytes, this will be fixed later with support for streaming.
|
|
return nil, new(apistatus.ObjectOutOfRange)
|
|
}
|
|
|
|
ln := prm.ln
|
|
if ln > maxInitialBufferSize {
|
|
ln = maxInitialBufferSize
|
|
}
|
|
|
|
w := bytes.NewBuffer(make([]byte, ln))
|
|
_, err = io.CopyN(w, rdr, int64(prm.ln))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read payload: %w", err)
|
|
}
|
|
|
|
return &PayloadRangeRes{
|
|
data: w.Bytes(),
|
|
}, nil
|
|
}
|
|
|
|
// PutObjectPrm groups parameters of PutObject operation.
|
|
type PutObjectPrm struct {
|
|
commonPrm
|
|
|
|
obj *objectSDK.Object
|
|
}
|
|
|
|
// SetObject sets object to be stored.
|
|
//
|
|
// Required parameter.
|
|
func (x *PutObjectPrm) SetObject(obj *objectSDK.Object) {
|
|
x.obj = obj
|
|
}
|
|
|
|
// PutObjectRes groups the resulting values of PutObject operation.
|
|
type PutObjectRes struct {
|
|
id oid.ID
|
|
}
|
|
|
|
// ID returns identifier of the stored objectSDK.
|
|
func (x PutObjectRes) ID() oid.ID {
|
|
return x.id
|
|
}
|
|
|
|
// PutObject saves the object in local storage of the remote node.
|
|
//
|
|
// Client and key must be set.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func PutObject(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObject")
|
|
defer span.End()
|
|
|
|
var prmCli client.PrmObjectPutInit
|
|
|
|
prmCli.MarkLocal()
|
|
|
|
if prm.key != nil {
|
|
prmCli.UseKey(*prm.key)
|
|
}
|
|
|
|
if prm.tokenSession != nil {
|
|
prmCli.WithinSession(*prm.tokenSession)
|
|
}
|
|
|
|
if prm.tokenBearer != nil {
|
|
prmCli.WithBearerToken(*prm.tokenBearer)
|
|
}
|
|
|
|
prmCli.WithXHeaders(prm.xHeaders...)
|
|
|
|
w, err := prm.cli.ObjectPutInit(ctx, prmCli)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object writing on client: %w", err)
|
|
}
|
|
|
|
if w.WriteHeader(ctx, *prm.obj) {
|
|
w.WritePayloadChunk(ctx, prm.obj.Payload())
|
|
}
|
|
|
|
cliRes, err := w.Close(ctx)
|
|
if err == nil {
|
|
err = apistatus.ErrFromStatus(cliRes.Status())
|
|
} else {
|
|
ReportError(prm.cli, err)
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("write object via client: %w", err)
|
|
}
|
|
|
|
return &PutObjectRes{
|
|
id: cliRes.StoredObjectID(),
|
|
}, nil
|
|
}
|
|
|
|
// PutObjectSingle saves the object in local storage of the remote node with PutSingle RPC.
|
|
//
|
|
// Client and key must be set.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func PutObjectSingle(ctx context.Context, prm PutObjectPrm) (*PutObjectRes, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "client.PutObjectSingle")
|
|
defer span.End()
|
|
|
|
objID, isSet := prm.obj.ID()
|
|
if !isSet {
|
|
return nil, errors.New("missing object id")
|
|
}
|
|
|
|
prmCli := client.PrmObjectPutSingle{
|
|
XHeaders: prm.xHeaders,
|
|
BearerToken: prm.tokenBearer,
|
|
Session: prm.tokenSession,
|
|
Local: true,
|
|
Key: prm.key,
|
|
Object: prm.obj,
|
|
}
|
|
|
|
res, err := prm.cli.ObjectPutSingle(ctx, prmCli)
|
|
if err != nil {
|
|
ReportError(prm.cli, err)
|
|
return nil, fmt.Errorf("put single object on client: %w", err)
|
|
}
|
|
|
|
if err = apistatus.ErrFromStatus(res.Status()); err != nil {
|
|
return nil, fmt.Errorf("put single object via client: %w", err)
|
|
}
|
|
|
|
return &PutObjectRes{
|
|
id: objID,
|
|
}, nil
|
|
}
|
|
|
|
// SearchObjectsPrm groups parameters of SearchObjects operation.
|
|
type SearchObjectsPrm struct {
|
|
readPrmCommon
|
|
|
|
cliPrm client.PrmObjectSearch
|
|
}
|
|
|
|
// SetContainerID sets identifier of the container to search the objects.
|
|
//
|
|
// Required parameter.
|
|
func (x *SearchObjectsPrm) SetContainerID(id cid.ID) {
|
|
x.cliPrm.InContainer(id)
|
|
}
|
|
|
|
// SetFilters sets search filters.
|
|
func (x *SearchObjectsPrm) SetFilters(fs objectSDK.SearchFilters) {
|
|
x.cliPrm.SetFilters(fs)
|
|
}
|
|
|
|
// SearchObjectsRes groups the resulting values of SearchObjects operation.
|
|
type SearchObjectsRes struct {
|
|
ids []oid.ID
|
|
}
|
|
|
|
// IDList returns identifiers of the matched objects.
|
|
func (x SearchObjectsRes) IDList() []oid.ID {
|
|
return x.ids
|
|
}
|
|
|
|
// SearchObjects selects objects from container which match the filters.
|
|
//
|
|
// Returns any error which prevented the operation from completing correctly in error return.
|
|
func SearchObjects(ctx context.Context, prm SearchObjectsPrm) (*SearchObjectsRes, error) {
|
|
if prm.local {
|
|
prm.cliPrm.MarkLocal()
|
|
}
|
|
|
|
if prm.tokenSession != nil {
|
|
prm.cliPrm.WithinSession(*prm.tokenSession)
|
|
}
|
|
|
|
if prm.tokenBearer != nil {
|
|
prm.cliPrm.WithBearerToken(*prm.tokenBearer)
|
|
}
|
|
|
|
prm.cliPrm.WithXHeaders(prm.xHeaders...)
|
|
|
|
if prm.key != nil {
|
|
prm.cliPrm.UseKey(*prm.key)
|
|
}
|
|
|
|
rdr, err := prm.cli.ObjectSearchInit(ctx, prm.cliPrm)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("init object searching in client: %w", err)
|
|
}
|
|
|
|
buf := make([]oid.ID, 10)
|
|
var ids []oid.ID
|
|
var n int
|
|
var ok bool
|
|
|
|
for {
|
|
n, ok = rdr.Read(buf)
|
|
if n > 0 {
|
|
for i := range buf[:n] {
|
|
v := buf[i]
|
|
ids = append(ids, v)
|
|
}
|
|
}
|
|
|
|
if !ok {
|
|
break
|
|
}
|
|
}
|
|
|
|
res, err := rdr.Close()
|
|
if err == nil {
|
|
// pull out an error from status
|
|
err = apistatus.ErrFromStatus(res.Status())
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read object list: %w", err)
|
|
}
|
|
|
|
return &SearchObjectsRes{
|
|
ids: ids,
|
|
}, nil
|
|
}
|