[#127] Split FrostFS ReadObject to separate methods
All checks were successful
/ DCO (pull_request) Successful in 46s
/ Vulncheck (pull_request) Successful in 1m15s
/ Builds (1.21) (pull_request) Successful in 1m36s
/ Builds (1.22) (pull_request) Successful in 1m34s
/ Lint (pull_request) Successful in 2m40s
/ Tests (1.21) (pull_request) Successful in 1m24s
/ Tests (1.22) (pull_request) Successful in 1m24s

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-15 16:35:08 +03:00
parent f20ea67b46
commit fcf99d9a59
6 changed files with 109 additions and 68 deletions

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -76,8 +77,25 @@ func (x payloadReader) Read(p []byte) (int, error) {
return n, handleObjectError("read payload", err) return n, handleObjectError("read payload", err)
} }
// ReadObject implements frostfs.FrostFS interface method. // HeadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) { func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) {
var prmHead pool.PrmObjectHead
prmHead.SetAddress(prm.Address)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements frostfs.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) {
var prmGet pool.PrmObjectGet var prmGet pool.PrmObjectGet
prmGet.SetAddress(prm.Address) prmGet.SetAddress(prm.Address)
@ -85,27 +103,19 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
prmGet.UseBearer(*prm.BearerToken) prmGet.UseBearer(*prm.BearerToken)
} }
// The code below must be reworked. It was copied from frostfs-s3-gw res, err := x.pool.GetObject(ctx, prmGet)
// to create similar mocks for unit and fuzzing tests. if err != nil {
// return nil, handleObjectError("init full object reading via connection pool", err)
// However, this code was changed due to specific of expected responses
// from HTTP gateway. HTTP Gateway requires two types of responses:
// * payload as io.Reader + HEAD request
// * only payload as io.Reader
// Therefore all unused params were deleted and code was simplified.
if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full payload range reading via connection pool", err)
}
return &handler.ObjectPart{
Payload: res.Payload,
Head: &res.Header,
}, nil
} }
return &handler.Object{
Header: res.Header,
Payload: res.Payload,
}, nil
}
// RangeObject implements frostfs.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) {
var prmRange pool.PrmObjectRange var prmRange pool.PrmObjectRange
prmRange.SetAddress(prm.Address) prmRange.SetAddress(prm.Address)
prmRange.SetOffset(prm.PayloadRange[0]) prmRange.SetOffset(prm.PayloadRange[0])
@ -120,9 +130,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
return nil, handleObjectError("init payload range reading via connection pool", err) return nil, handleObjectError("init payload range reading via connection pool", err)
} }
return &handler.ObjectPart{ return payloadReader{&res}, nil
Payload: payloadReader{&res},
}, nil
} }
// SearchObjects implements frostfs.FrostFS interface method. // SearchObjects implements frostfs.FrostFS interface method.

View file

@ -153,19 +153,19 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
} }
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
prm := PrmObjectRead{ prm := PrmObjectGet{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: btoken, BearerToken: btoken,
}, },
Address: addr, Address: addr,
} }
resGet, err := h.frostfs.ReadObject(ctx, prm) resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil { if err != nil {
return fmt.Errorf("get FrostFS object: %v", err) return fmt.Errorf("get FrostFS object: %v", err)
} }
objWriter, err := h.addObjectToZip(zipWriter, resGet.Head) objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
if err != nil { if err != nil {
return fmt.Errorf("zip create header: %v", err) return fmt.Errorf("zip create header: %v", err)
} }

View file

@ -78,32 +78,49 @@ func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID {
return owner return owner
} }
func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) { func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*object.Object, error) {
sAddr := prm.Address.EncodeToString() sAddr := addr.EncodeToString()
if obj, ok := t.objects[sAddr]; ok { if obj, ok := t.objects[sAddr]; ok {
owner := t.requestOwner(prm.BearerToken) owner := t.requestOwner(btoken)
if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) { if !t.isAllowed(addr.Container(), owner, acl.OpObjectGet, addr.Object()) {
return nil, ErrAccessDenied return nil, ErrAccessDenied
} }
payload := obj.Payload() return obj, nil
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
off := prm.PayloadRange[0]
payload = payload[off : off+prm.PayloadRange[1]]
obj = nil // GetRange does not return header values
}
return &ObjectPart{
Head: obj,
Payload: io.NopCloser(bytes.NewReader(payload)),
}, nil
} }
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address) return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) HeadObject(_ context.Context, prm PrmObjectHead) (*object.Object, error) {
return t.retrieveObject(prm.Address, prm.BearerToken)
}
func (t *TestFrostFS) GetObject(_ context.Context, prm PrmObjectGet) (*Object, error) {
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
if err != nil {
return nil, err
}
return &Object{
Header: *obj,
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
}, nil
}
func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
if err != nil {
return nil, err
}
off := prm.PayloadRange[0]
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
return io.NopCloser(bytes.NewReader(payload)), nil
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {

View file

@ -46,8 +46,26 @@ type PrmAuth struct {
BearerToken *bearer.Token BearerToken *bearer.Token
} }
// PrmObjectRead groups parameters of FrostFS.ReadObject operation. // PrmObjectHead groups parameters of FrostFS.HeadObject operation.
type PrmObjectRead struct { type PrmObjectHead struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
type PrmObjectGet struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
}
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
type PrmObjectRange struct {
// Authentication parameters. // Authentication parameters.
PrmAuth PrmAuth
@ -58,10 +76,10 @@ type PrmObjectRead struct {
PayloadRange [2]uint64 PayloadRange [2]uint64
} }
// ObjectPart represents partially read FrostFS object. // Object represents FrostFS object.
type ObjectPart struct { type Object struct {
// Object header with optional in-memory payload part. // Object header (doesn't contain payload).
Head *object.Object Header object.Object
// Object payload part encapsulated in io.Reader primitive. // Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation. // Returns ErrAccessDenied on read access violation.
@ -115,7 +133,9 @@ var (
// FrostFS represents virtual connection to FrostFS network. // FrostFS represents virtual connection to FrostFS network.
type FrostFS interface { type FrostFS interface {
Container(context.Context, PrmContainer) (*container.Container, error) Container(context.Context, PrmContainer) (*container.Container, error)
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error) HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
GetObject(context.Context, PrmObjectGet) (*Object, error)
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
utils.EpochInfoFetcher utils.EpochInfoFetcher

View file

@ -29,22 +29,22 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
btoken := bearerToken(ctx) btoken := bearerToken(ctx)
prm := PrmObjectRead{ prm := PrmObjectHead{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: btoken, BearerToken: btoken,
}, },
Address: objectAddress, Address: objectAddress,
} }
obj, err := h.frostfs.ReadObject(ctx, prm) obj, err := h.frostfs.HeadObject(ctx, prm)
if err != nil { if err != nil {
req.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
} }
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
var contentType string var contentType string
for _, attr := range obj.Head.Attributes() { for _, attr := range obj.Attributes() {
key := attr.Key() key := attr.Key()
val := attr.Value() val := attr.Value()
if !isValidToken(key) || !isValidValue(val) { if !isValidToken(key) || !isValidValue(val) {
@ -70,11 +70,11 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
} }
} }
idsToResponse(&req.Response, obj.Head) idsToResponse(&req.Response, obj)
if len(contentType) == 0 { if len(contentType) == 0 {
contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) { contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
prmRange := PrmObjectRead{ prmRange := PrmObjectRange{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: btoken, BearerToken: btoken,
}, },
@ -82,11 +82,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
PayloadRange: [2]uint64{0, sz}, PayloadRange: [2]uint64{0, sz},
} }
resObj, err := h.frostfs.ReadObject(ctx, prmRange) return h.frostfs.RangeObject(ctx, prmRange)
if err != nil {
return nil, err
}
return resObj.Payload, nil
}) })
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
req.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)

View file

@ -55,14 +55,14 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
filename string filename string
) )
prm := PrmObjectRead{ prm := PrmObjectGet{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx), BearerToken: bearerToken(ctx),
}, },
Address: objectAddress, Address: objectAddress,
} }
rObj, err := h.frostfs.ReadObject(ctx, prm) rObj, err := h.frostfs.GetObject(ctx, prm)
if err != nil { if err != nil {
req.handleFrostFSErr(err, start) req.handleFrostFSErr(err, start)
return return
@ -74,11 +74,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
dis = "attachment" dis = "attachment"
} }
payloadSize := rObj.Head.PayloadSize() payloadSize := rObj.Header.PayloadSize()
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string var contentType string
for _, attr := range rObj.Head.Attributes() { for _, attr := range rObj.Header.Attributes() {
key := attr.Key() key := attr.Key()
val := attr.Value() val := attr.Value()
if !isValidToken(key) || !isValidValue(val) { if !isValidToken(key) || !isValidValue(val) {
@ -107,7 +107,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
} }
} }
idsToResponse(&req.Response, rObj.Head) idsToResponse(&req.Response, &rObj.Header)
if len(contentType) == 0 { if len(contentType) == 0 {
// determine the Content-Type from the payload head // determine the Content-Type from the payload head