[#127] Split FrostFS ReadObject to separate methods
Some checks failed
/ Vulncheck (pull_request) Successful in 1m20s
/ DCO (pull_request) Successful in 1m53s
/ Builds (1.21) (pull_request) Failing after 36s
/ Builds (1.22) (pull_request) Failing after 31s
/ Lint (pull_request) Successful in 2m7s
/ Tests (1.21) (pull_request) Failing after 26s
/ Tests (1.22) (pull_request) Failing after 39s

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-15 16:35:08 +03:00
parent 16545bd3b0
commit 56505fa7a9
6 changed files with 109 additions and 68 deletions

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"google.golang.org/grpc/codes"
@ -76,8 +77,25 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err)
}
// ReadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) {
// HeadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(prm.Address)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements frostfs.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) {
var prmGet pool.PrmObjectGet
prmGet.SetAddress(prm.Address)
@ -85,27 +103,19 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
prmGet.UseBearer(*prm.BearerToken)
}
// The code below must be reworked. It was copied from frostfs-s3-gw
// to create similar mocks for unit and fuzzing tests.
//
// However, this code was changed due to specific of expected responses
// from HTTP gateway. HTTP Gateway requires two types of responses:
// * payload as io.Reader + HEAD request
// * only payload as io.Reader
// Therefore all unused params were deleted and code was simplified.
if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full payload range reading via connection pool", err)
return nil, handleObjectError("init full object reading via connection pool", err)
}
return &handler.ObjectPart{
return &handler.Object{
Header: res.Header,
Payload: res.Payload,
Head: &res.Header,
}, nil
}
}
// RangeObject implements frostfs.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) {
var prmRange pool.PrmObjectRange
prmRange.SetAddress(prm.Address)
prmRange.SetOffset(prm.PayloadRange[0])
@ -120,9 +130,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
return nil, handleObjectError("init payload range reading via connection pool", err)
}
return &handler.ObjectPart{
Payload: payloadReader{&res},
}, nil
return payloadReader{&res}, nil
}
// SearchObjects implements frostfs.FrostFS interface method.

View file

@ -153,19 +153,19 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
}
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
prm := PrmObjectRead{
prm := PrmObjectGet{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: addr,
}
resGet, err := h.frostfs.ReadObject(ctx, prm)
resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
return fmt.Errorf("get FrostFS object: %v", err)
}
objWriter, err := h.addObjectToZip(zipWriter, resGet.Head)
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
if err != nil {
return fmt.Errorf("zip create header: %v", err)
}

View file

@ -78,32 +78,49 @@ func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID {
return owner
}
func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) {
sAddr := prm.Address.EncodeToString()
func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*object.Object, error) {
sAddr := addr.EncodeToString()
if obj, ok := t.objects[sAddr]; ok {
owner := t.requestOwner(prm.BearerToken)
owner := t.requestOwner(btoken)
if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) {
if !t.isAllowed(addr.Container(), owner, acl.OpObjectGet, addr.Object()) {
return nil, ErrAccessDenied
}
payload := obj.Payload()
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
off := prm.PayloadRange[0]
payload = payload[off : off+prm.PayloadRange[1]]
obj = nil // GetRange does not return header values
return obj, nil
}
return &ObjectPart{
Head: obj,
Payload: io.NopCloser(bytes.NewReader(payload)),
}, nil
}
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address)
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
}
func (t *TestFrostFS) HeadObject(_ context.Context, prm PrmObjectHead) (*object.Object, error) {
return t.retrieveObject(prm.Address, prm.BearerToken)
}
func (t *TestFrostFS) GetObject(_ context.Context, prm PrmObjectGet) (*Object, error) {
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
if err != nil {
return nil, err
}
return &Object{
Header: *obj,
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
}, nil
}
func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
if err != nil {
return nil, err
}
off := prm.PayloadRange[0]
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
return io.NopCloser(bytes.NewReader(payload)), nil
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {

View file

@ -46,8 +46,26 @@ type PrmAuth struct {
BearerToken *bearer.Token
}
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
type PrmObjectRead struct {
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
type PrmObjectHead struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
type PrmObjectGet struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
type PrmObjectRange struct {
// Authentication parameters.
PrmAuth
@ -58,10 +76,10 @@ type PrmObjectRead struct {
PayloadRange [2]uint64
}
// ObjectPart represents partially read FrostFS object.
type ObjectPart struct {
// Object header with optional in-memory payload part.
Head *object.Object
// Object represents FrostFS object.
type Object struct {
// Object header (doesn't contain payload).
Header object.Object
// Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation.
@ -115,7 +133,9 @@ var (
// FrostFS represents virtual connection to FrostFS network.
type FrostFS interface {
Container(context.Context, PrmContainer) (*container.Container, error)
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
GetObject(context.Context, PrmObjectGet) (*Object, error)
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
utils.EpochInfoFetcher

View file

@ -29,22 +29,22 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
btoken := bearerToken(ctx)
prm := PrmObjectRead{
prm := PrmObjectHead{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: objectAddress,
}
obj, err := h.frostfs.ReadObject(ctx, prm)
obj, err := h.frostfs.HeadObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
return
}
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10))
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
var contentType string
for _, attr := range obj.Head.Attributes() {
for _, attr := range obj.Attributes() {
key := attr.Key()
val := attr.Value()
if !isValidToken(key) || !isValidValue(val) {
@ -70,11 +70,11 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
}
}
idsToResponse(&req.Response, obj.Head)
idsToResponse(&req.Response, obj)
if len(contentType) == 0 {
contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) {
prmRange := PrmObjectRead{
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
prmRange := PrmObjectRange{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
@ -82,11 +82,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
PayloadRange: [2]uint64{0, sz},
}
resObj, err := h.frostfs.ReadObject(ctx, prmRange)
if err != nil {
return nil, err
}
return resObj.Payload, nil
return h.frostfs.RangeObject(ctx, prmRange)
})
if err != nil && err != io.EOF {
req.handleFrostFSErr(err, start)

View file

@ -55,14 +55,14 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
filename string
)
prm := PrmObjectRead{
prm := PrmObjectGet{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Address: objectAddress,
}
rObj, err := h.frostfs.ReadObject(ctx, prm)
rObj, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
return
@ -74,11 +74,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
dis = "attachment"
}
payloadSize := rObj.Head.PayloadSize()
payloadSize := rObj.Header.PayloadSize()
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string
for _, attr := range rObj.Head.Attributes() {
for _, attr := range rObj.Header.Attributes() {
key := attr.Key()
val := attr.Value()
if !isValidToken(key) || !isValidValue(val) {
@ -107,7 +107,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
}
}
idsToResponse(&req.Response, rObj.Head)
idsToResponse(&req.Response, &rObj.Header)
if len(contentType) == 0 {
// determine the Content-Type from the payload head