[#427] layer: Split FrostFS ReadObject to separate methods #427

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:feature/frostfs_get_simplify into master 2024-07-24 12:11:19 +00:00
13 changed files with 198 additions and 139 deletions

View file

@ -37,7 +37,7 @@ func TestSimpleGetEncrypted(t *testing.T) {
objInfo, err := tc.Layer().GetObjectInfo(tc.Context(), &layer.HeadObjectParams{BktInfo: bktInfo, Object: objName}) objInfo, err := tc.Layer().GetObjectInfo(tc.Context(), &layer.HeadObjectParams{BktInfo: bktInfo, Object: objName})
require.NoError(t, err) require.NoError(t, err)
obj, err := tc.MockedPool().ReadObject(tc.Context(), layer.PrmObjectRead{Container: bktInfo.CID, Object: objInfo.ID}) obj, err := tc.MockedPool().GetObject(tc.Context(), layer.PrmObjectGet{Container: bktInfo.CID, Object: objInfo.ID})
require.NoError(t, err) require.NoError(t, err)
encryptedContent, err := io.ReadAll(obj.Payload) encryptedContent, err := io.ReadAll(obj.Payload)
require.NoError(t, err) require.NoError(t, err)

View file

@ -78,8 +78,32 @@ type PrmAuth struct {
PrivateKey *ecdsa.PrivateKey PrivateKey *ecdsa.PrivateKey
} }
// PrmObjectRead groups parameters of FrostFS.ReadObject operation. // PrmObjectHead groups parameters of FrostFS.HeadObject operation.
type PrmObjectRead struct { type PrmObjectHead struct {
// Authentication parameters.
PrmAuth
// Container to read the object header from.
Container cid.ID
// ID of the object for which to read the header.
Object oid.ID
}
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
type PrmObjectGet struct {
// Authentication parameters.
PrmAuth
// Container to read the object header from.
Container cid.ID
// ID of the object for which to read the header.
Object oid.ID
}
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
type PrmObjectRange struct {
// Authentication parameters. // Authentication parameters.
PrmAuth PrmAuth
@ -89,20 +113,14 @@ type PrmObjectRead struct {
// ID of the object for which to read the header. // ID of the object for which to read the header.
Object oid.ID Object oid.ID
// Flag to read object header.
WithHeader bool
// Flag to read object payload. False overlaps payload range.
WithPayload bool
// Offset-length range of the object payload to be read. // Offset-length range of the object payload to be read.
PayloadRange [2]uint64 PayloadRange [2]uint64
} }
// ObjectPart represents partially read FrostFS object. // Object represents full read FrostFS object.
type ObjectPart struct { type Object struct {
// Object header with optional in-memory payload part. // Object header (doesn't contain payload).
Head *object.Object Header object.Object
// Object payload part encapsulated in io.Reader primitive. // Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation. // Returns ErrAccessDenied on read access violation.
@ -213,13 +231,15 @@ type FrostFS interface {
// It returns any error encountered which prevented the removal request from being sent. // It returns any error encountered which prevented the removal request from being sent.
DeleteContainer(context.Context, cid.ID, *session.Container) error DeleteContainer(context.Context, cid.ID, *session.Container) error
// ReadObject reads a part of the object from the FrostFS container by identifier. // HeadObject reads an info of the object from the FrostFS container by identifier.
// Exact part is returned according to the parameters:
// * with header only: empty payload (both in-mem and reader parts are nil);
// * with payload only: header is nil (zero range means full payload);
// * with header and payload: full in-mem object, payload reader is nil.
// //
// WithHeader or WithPayload is true. Range length is positive if offset is positive. // It returns ErrAccessDenied on read access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read.
HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error)
// GetObject reads an object from the FrostFS container by identifier.
// //
// Payload reader should be closed if it is no longer needed. // Payload reader should be closed if it is no longer needed.
// //
@ -227,7 +247,17 @@ type FrostFS interface {
// //
// It returns exactly one non-nil value. It returns any error encountered which // It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read. // prevented the object header from being read.
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error) GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error)
// RangeObject reads a part of object from the FrostFS container by identifier.
//
// Payload reader should be closed if it is no longer needed.
//
// It returns ErrAccessDenied on read access violation.
//
// It returns exactly one non-nil value. It returns any error encountered which
// prevented the object header from being read.
RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error)
// CreateObject creates and saves a parameterized object in the FrostFS container. // CreateObject creates and saves a parameterized object in the FrostFS container.
// It sets 'Timestamp' attribute to the current time. // It sets 'Timestamp' attribute to the current time.

View file

@ -204,10 +204,10 @@ func (t *TestFrostFS) UserContainers(context.Context, PrmUserContainers) ([]cid.
return res, nil return res, nil
} }
func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*ObjectPart, error) { func (t *TestFrostFS) retrieveObject(ctx context.Context, cnrID cid.ID, objID oid.ID) (*object.Object, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(cnrID)
addr.SetObject(prm.Object) addr.SetObject(objID)
sAddr := addr.EncodeToString() sAddr := addr.EncodeToString()
@ -217,26 +217,44 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
if obj, ok := t.objects[sAddr]; ok { if obj, ok := t.objects[sAddr]; ok {
owner := getBearerOwner(ctx) owner := getBearerOwner(ctx)
if !t.checkAccess(prm.Container, owner) { if !t.checkAccess(cnrID, owner) {
return nil, ErrAccessDenied return nil, ErrAccessDenied
} }
payload := obj.Payload() return obj, nil
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
off := prm.PayloadRange[0]
payload = payload[off : off+prm.PayloadRange[1]]
}
return &ObjectPart{
Head: obj,
Payload: io.NopCloser(bytes.NewReader(payload)),
}, nil
} }
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr) return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
return t.retrieveObject(ctx, prm.Container, prm.Object)
}
func (t *TestFrostFS) GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error) {
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
if err != nil {
return nil, err
}
return &Object{
Header: *obj,
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
}, nil
}
func (t *TestFrostFS) RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
if err != nil {
return nil, err
}
off := prm.PayloadRange[0]
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
return io.NopCloser(bytes.NewReader(payload)), nil
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {

View file

@ -742,7 +742,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
} }
var parts []*data.PartInfo var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil { if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return fmt.Errorf("unmarshal combined object parts: %w", err) return fmt.Errorf("unmarshal combined object parts: %w", err)
} }

View file

@ -68,20 +68,14 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
// objectHead returns all object's headers. // objectHead returns all object's headers.
func (n *Layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) { func (n *Layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
prm := PrmObjectRead{ prm := PrmObjectHead{
Container: bktInfo.CID, Container: bktInfo.CID,
Object: idObj, Object: idObj,
WithHeader: true,
} }
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.frostFS.ReadObject(ctx, prm) return n.frostFS.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Head, nil
} }
func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) { func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
@ -100,7 +94,7 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
} }
var parts []*data.PartInfo var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil { if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err) return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
} }
@ -132,16 +126,27 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
// initializes payload reader of the FrostFS object. // initializes payload reader of the FrostFS object.
// Zero range corresponds to full payload (panics if only offset is set). // Zero range corresponds to full payload (panics if only offset is set).
func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) { func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
prm := PrmObjectRead{ var prmAuth PrmAuth
n.prepareAuthParameters(ctx, &prmAuth, p.bktInfo.Owner)
if p.off+p.ln != 0 {
prm := PrmObjectRange{
PrmAuth: prmAuth,
Container: p.bktInfo.CID, Container: p.bktInfo.CID,
Object: p.oid, Object: p.oid,
WithPayload: true,
PayloadRange: [2]uint64{p.off, p.ln}, PayloadRange: [2]uint64{p.off, p.ln},
} }
n.prepareAuthParameters(ctx, &prm.PrmAuth, p.bktInfo.Owner) return n.frostFS.RangeObject(ctx, prm)
}
res, err := n.frostFS.ReadObject(ctx, prm) prm := PrmObjectGet{
PrmAuth: prmAuth,
Container: p.bktInfo.CID,
Object: p.oid,
}
res, err := n.frostFS.GetObject(ctx, prm)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,32 +155,25 @@ func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFS
} }
// objectGet returns an object with payload in the object. // objectGet returns an object with payload in the object.
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) { func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*Object, error) {
return n.objectGetBase(ctx, bktInfo, objID, PrmAuth{}) return n.objectGetBase(ctx, bktInfo, objID, PrmAuth{})
} }
// objectGetWithAuth returns an object with payload in the object. Uses provided PrmAuth. // objectGetWithAuth returns an object with payload in the object. Uses provided PrmAuth.
func (n *Layer) objectGetWithAuth(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*object.Object, error) { func (n *Layer) objectGetWithAuth(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
return n.objectGetBase(ctx, bktInfo, objID, auth) return n.objectGetBase(ctx, bktInfo, objID, auth)
} }
func (n *Layer) objectGetBase(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*object.Object, error) { func (n *Layer) objectGetBase(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
prm := PrmObjectRead{ prm := PrmObjectGet{
PrmAuth: auth, PrmAuth: auth,
Container: bktInfo.CID, Container: bktInfo.CID,
Object: objID, Object: objID,
WithHeader: true,
WithPayload: true,
} }
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
res, err := n.frostFS.ReadObject(ctx, prm) return n.frostFS.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Head, nil
} }
// MimeByFilePath detect mime type by file path extension. // MimeByFilePath detect mime type by file path extension.

View file

@ -183,8 +183,7 @@ func (n *Layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSCo
} }
cors := &data.CORSConfiguration{} cors := &data.CORSConfiguration{}
if err = xml.NewDecoder(obj.Payload).Decode(&cors); err != nil {
if err = xml.Unmarshal(obj.Payload(), &cors); err != nil {
return nil, fmt.Errorf("unmarshal cors: %w", err) return nil, fmt.Errorf("unmarshal cors: %w", err)
} }

View file

@ -50,7 +50,7 @@ func createFrostFS(ctx context.Context, log *zap.Logger, cfg PoolConfig) (*frost
return nil, fmt.Errorf("dial pool: %w", err) return nil, fmt.Errorf("dial pool: %w", err)
} }
return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key)), nil return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key), log), nil
} }
func parsePolicies(val string) (authmate.ContainerPolicies, error) { func parsePolicies(val string) (authmate.ContainerPolicies, error) {

View file

@ -124,7 +124,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
objPool, treePool, key := getPools(ctx, log.logger, v) objPool, treePool, key := getPools(ctx, log.logger, v)
cfg := tokens.Config{ cfg := tokens.Config{
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key)), FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key), log.logger),
Key: key, Key: key,
CacheConfig: getAccessBoxCacheConfig(v, log.logger), CacheConfig: getAccessBoxCacheConfig(v, log.logger),
RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger), RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger),

View file

@ -92,6 +92,7 @@ type FrostFS interface {
// //
// It returns exactly one non-nil value. It returns any error encountered which // It returns exactly one non-nil value. It returns any error encountered which
// prevented the object payload from being read. // prevented the object payload from being read.
// Object must contain full payload.
GetCredsObject(context.Context, oid.Address) (*object.Object, error) GetCredsObject(context.Context, oid.Address) (*object.Object, error)
} }

View file

@ -4,18 +4,22 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"strconv" "strconv"
"time" "time"
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/authmate" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/authmate"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/crdt" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/crdt"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
) )
const ( const (
@ -25,11 +29,12 @@ const (
// AuthmateFrostFS is a mediator which implements authmate.FrostFS through pool.Pool. // AuthmateFrostFS is a mediator which implements authmate.FrostFS through pool.Pool.
type AuthmateFrostFS struct { type AuthmateFrostFS struct {
frostFS layer.FrostFS frostFS layer.FrostFS
log *zap.Logger
} }
// NewAuthmateFrostFS creates new AuthmateFrostFS using provided pool.Pool. // NewAuthmateFrostFS creates new AuthmateFrostFS using provided pool.Pool.
func NewAuthmateFrostFS(frostFS layer.FrostFS) *AuthmateFrostFS { func NewAuthmateFrostFS(frostFS layer.FrostFS, log *zap.Logger) *AuthmateFrostFS {
return &AuthmateFrostFS{frostFS: frostFS} return &AuthmateFrostFS{frostFS: frostFS, log: log}
} }
// ContainerExists implements authmate.FrostFS interface method. // ContainerExists implements authmate.FrostFS interface method.
@ -79,17 +84,27 @@ func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, addr oid.Address)
credObjID = last.ObjID credObjID = last.ObjID
} }
res, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{ res, err := x.frostFS.GetObject(ctx, layer.PrmObjectGet{
Container: addr.Container(), Container: addr.Container(),
Object: credObjID, Object: credObjID,
WithPayload: true,
WithHeader: true,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return res.Head, err defer func() {
if closeErr := res.Payload.Close(); closeErr != nil {
x.reqLogger(ctx).Warn(logs.CloseCredsObjectPayload, zap.Error(closeErr))
}
}()
data, err := io.ReadAll(res.Payload)
if err != nil {
return nil, err
}
res.Header.SetPayload(data)
return &res.Header, err
} }
// CreateObject implements authmate.FrostFS interface method. // CreateObject implements authmate.FrostFS interface method.
@ -143,21 +158,28 @@ func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address)
versions := crdt.NewObjectVersions(objCredSystemName) versions := crdt.NewObjectVersions(objCredSystemName)
for _, id := range credVersions { for _, id := range credVersions {
objVersion, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{ objVersion, err := x.frostFS.HeadObject(ctx, layer.PrmObjectHead{
Container: addr.Container(), Container: addr.Container(),
Object: id, Object: id,
WithHeader: true,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("head crdt access box '%s': %w", id.EncodeToString(), err) return nil, fmt.Errorf("head crdt access box '%s': %w", id.EncodeToString(), err)
} }
versions.AppendVersion(crdt.NewObjectVersion(objVersion.Head)) versions.AppendVersion(crdt.NewObjectVersion(objVersion))
} }
return versions, nil return versions, nil
} }
func (x *AuthmateFrostFS) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil {
return reqLogger
}
return x.log
}
func credVersionSysName(cnrID cid.ID, objID oid.ID) string { func credVersionSysName(cnrID cid.ID, objID oid.ID) string {
return cnrID.EncodeToString() + "0" + objID.EncodeToString() return cnrID.EncodeToString() + "0" + objID.EncodeToString()
} }

View file

@ -14,6 +14,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
) )
func TestGetCredsObject(t *testing.T) { func TestGetCredsObject(t *testing.T) {
@ -35,7 +36,7 @@ func TestGetCredsObject(t *testing.T) {
}, },
}}) }})
frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key)) frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key), zaptest.NewLogger(t))
cid, err := frostfs.CreateContainer(ctx, authmate.PrmContainerCreate{ cid, err := frostfs.CreateContainer(ctx, authmate.PrmContainerCreate{
FriendlyName: bktName, FriendlyName: bktName,

View file

@ -255,8 +255,31 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err) return n, handleObjectError("read payload", err)
} }
// ReadObject implements frostfs.FrostFS interface method. // HeadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) { func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements frostfs.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
addr.SetObject(prm.Object) addr.SetObject(prm.Object)
@ -270,54 +293,22 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
prmGet.UseKey(prm.PrivateKey) prmGet.UseKey(prm.PrivateKey)
} }
if prm.WithHeader {
if prm.WithPayload {
res, err := x.pool.GetObject(ctx, prmGet) res, err := x.pool.GetObject(ctx, prmGet)
if err != nil { if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err) return nil, handleObjectError("init full object reading via connection pool", err)
} }
defer res.Payload.Close() return &layer.Object{
Header: res.Header,
payload, err := io.ReadAll(res.Payload)
if err != nil {
return nil, handleObjectError("read full object payload", err)
}
res.Header.SetPayload(payload)
return &layer.ObjectPart{
Head: &res.Header,
}, nil
}
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
hdr, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &layer.ObjectPart{
Head: &hdr,
}, nil
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full payload range reading via connection pool", err)
}
return &layer.ObjectPart{
Payload: res.Payload, Payload: res.Payload,
}, nil }, nil
} }
// RangeObject implements frostfs.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmRange pool.PrmObjectRange var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr) prmRange.SetAddress(addr)
@ -335,9 +326,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
return nil, handleObjectError("init payload range reading via connection pool", err) return nil, handleObjectError("init payload range reading via connection pool", err)
} }
return &layer.ObjectPart{ return payloadReader{&res}, nil
Payload: payloadReader{&res},
}, nil
} }
// DeleteObject implements frostfs.FrostFS interface method. // DeleteObject implements frostfs.FrostFS interface method.

View file

@ -150,4 +150,5 @@ const (
FoundSeveralSystemNodes = "found several system nodes, latest be used" FoundSeveralSystemNodes = "found several system nodes, latest be used"
FailedToParsePartInfo = "failed to parse part info" FailedToParsePartInfo = "failed to parse part info"
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info" CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
CloseCredsObjectPayload = "close creds object payload"
) )