[#127] Split FrostFS ReadObject to separate methods
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
8ae81ba0f1
commit
f8ae6761ce
6 changed files with 109 additions and 68 deletions
|
@ -11,6 +11,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
@ -76,8 +77,25 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
||||||
return n, handleObjectError("read payload", err)
|
return n, handleObjectError("read payload", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadObject implements frostfs.FrostFS interface method.
|
// HeadObject implements frostfs.FrostFS interface method.
|
||||||
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) {
|
func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) {
|
||||||
|
var prmHead pool.PrmObjectHead
|
||||||
|
prmHead.SetAddress(prm.Address)
|
||||||
|
|
||||||
|
if prm.BearerToken != nil {
|
||||||
|
prmHead.UseBearer(*prm.BearerToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := x.pool.HeadObject(ctx, prmHead)
|
||||||
|
if err != nil {
|
||||||
|
return nil, handleObjectError("read object header via connection pool", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject implements frostfs.FrostFS interface method.
|
||||||
|
func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) {
|
||||||
var prmGet pool.PrmObjectGet
|
var prmGet pool.PrmObjectGet
|
||||||
prmGet.SetAddress(prm.Address)
|
prmGet.SetAddress(prm.Address)
|
||||||
|
|
||||||
|
@ -85,27 +103,19 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
|
||||||
prmGet.UseBearer(*prm.BearerToken)
|
prmGet.UseBearer(*prm.BearerToken)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The code below must be reworked. It was copied from frostfs-s3-gw
|
|
||||||
// to create similar mocks for unit and fuzzing tests.
|
|
||||||
//
|
|
||||||
// However, this code was changed due to specific of expected responses
|
|
||||||
// from HTTP gateway. HTTP Gateway requires two types of responses:
|
|
||||||
// * payload as io.Reader + HEAD request
|
|
||||||
// * only payload as io.Reader
|
|
||||||
// Therefore all unused params were deleted and code was simplified.
|
|
||||||
|
|
||||||
if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
|
|
||||||
res, err := x.pool.GetObject(ctx, prmGet)
|
res, err := x.pool.GetObject(ctx, prmGet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, handleObjectError("init full payload range reading via connection pool", err)
|
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &handler.ObjectPart{
|
return &handler.Object{
|
||||||
|
Header: res.Header,
|
||||||
Payload: res.Payload,
|
Payload: res.Payload,
|
||||||
Head: &res.Header,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RangeObject implements frostfs.FrostFS interface method.
|
||||||
|
func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) {
|
||||||
var prmRange pool.PrmObjectRange
|
var prmRange pool.PrmObjectRange
|
||||||
prmRange.SetAddress(prm.Address)
|
prmRange.SetAddress(prm.Address)
|
||||||
prmRange.SetOffset(prm.PayloadRange[0])
|
prmRange.SetOffset(prm.PayloadRange[0])
|
||||||
|
@ -120,9 +130,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h
|
||||||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
return nil, handleObjectError("init payload range reading via connection pool", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &handler.ObjectPart{
|
return payloadReader{&res}, nil
|
||||||
Payload: payloadReader{&res},
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SearchObjects implements frostfs.FrostFS interface method.
|
// SearchObjects implements frostfs.FrostFS interface method.
|
||||||
|
|
|
@ -153,19 +153,19 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||||
prm := PrmObjectRead{
|
prm := PrmObjectGet{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: btoken,
|
BearerToken: btoken,
|
||||||
},
|
},
|
||||||
Address: addr,
|
Address: addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
resGet, err := h.frostfs.ReadObject(ctx, prm)
|
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get FrostFS object: %v", err)
|
return fmt.Errorf("get FrostFS object: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
objWriter, err := h.addObjectToZip(zipWriter, resGet.Head)
|
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("zip create header: %v", err)
|
return fmt.Errorf("zip create header: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,32 +78,49 @@ func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID {
|
||||||
return owner
|
return owner
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) {
|
func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*object.Object, error) {
|
||||||
sAddr := prm.Address.EncodeToString()
|
sAddr := addr.EncodeToString()
|
||||||
|
|
||||||
if obj, ok := t.objects[sAddr]; ok {
|
if obj, ok := t.objects[sAddr]; ok {
|
||||||
owner := t.requestOwner(prm.BearerToken)
|
owner := t.requestOwner(btoken)
|
||||||
|
|
||||||
if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) {
|
if !t.isAllowed(addr.Container(), owner, acl.OpObjectGet, addr.Object()) {
|
||||||
return nil, ErrAccessDenied
|
return nil, ErrAccessDenied
|
||||||
}
|
}
|
||||||
|
|
||||||
payload := obj.Payload()
|
return obj, nil
|
||||||
|
|
||||||
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
|
|
||||||
off := prm.PayloadRange[0]
|
|
||||||
payload = payload[off : off+prm.PayloadRange[1]]
|
|
||||||
obj = nil // GetRange does not return header values
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return &ObjectPart{
|
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
||||||
Head: obj,
|
|
||||||
Payload: io.NopCloser(bytes.NewReader(payload)),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) HeadObject(_ context.Context, prm PrmObjectHead) (*object.Object, error) {
|
||||||
|
return t.retrieveObject(prm.Address, prm.BearerToken)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) GetObject(_ context.Context, prm PrmObjectGet) (*Object, error) {
|
||||||
|
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Object{
|
||||||
|
Header: *obj,
|
||||||
|
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
|
||||||
|
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
off := prm.PayloadRange[0]
|
||||||
|
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
|
||||||
|
return io.NopCloser(bytes.NewReader(payload)), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||||
b := make([]byte, 32)
|
b := make([]byte, 32)
|
||||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||||
|
|
|
@ -46,8 +46,26 @@ type PrmAuth struct {
|
||||||
BearerToken *bearer.Token
|
BearerToken *bearer.Token
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
|
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
|
||||||
type PrmObjectRead struct {
|
type PrmObjectHead struct {
|
||||||
|
// Authentication parameters.
|
||||||
|
PrmAuth
|
||||||
|
|
||||||
|
// Address to read the object header from.
|
||||||
|
Address oid.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
|
||||||
|
type PrmObjectGet struct {
|
||||||
|
// Authentication parameters.
|
||||||
|
PrmAuth
|
||||||
|
|
||||||
|
// Address to read the object header from.
|
||||||
|
Address oid.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||||
|
type PrmObjectRange struct {
|
||||||
// Authentication parameters.
|
// Authentication parameters.
|
||||||
PrmAuth
|
PrmAuth
|
||||||
|
|
||||||
|
@ -58,10 +76,10 @@ type PrmObjectRead struct {
|
||||||
PayloadRange [2]uint64
|
PayloadRange [2]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectPart represents partially read FrostFS object.
|
// Object represents FrostFS object.
|
||||||
type ObjectPart struct {
|
type Object struct {
|
||||||
// Object header with optional in-memory payload part.
|
// Object header (doesn't contain payload).
|
||||||
Head *object.Object
|
Header object.Object
|
||||||
|
|
||||||
// Object payload part encapsulated in io.Reader primitive.
|
// Object payload part encapsulated in io.Reader primitive.
|
||||||
// Returns ErrAccessDenied on read access violation.
|
// Returns ErrAccessDenied on read access violation.
|
||||||
|
@ -115,7 +133,9 @@ var (
|
||||||
// FrostFS represents virtual connection to FrostFS network.
|
// FrostFS represents virtual connection to FrostFS network.
|
||||||
type FrostFS interface {
|
type FrostFS interface {
|
||||||
Container(context.Context, PrmContainer) (*container.Container, error)
|
Container(context.Context, PrmContainer) (*container.Container, error)
|
||||||
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
|
HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
|
||||||
|
GetObject(context.Context, PrmObjectGet) (*Object, error)
|
||||||
|
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||||
utils.EpochInfoFetcher
|
utils.EpochInfoFetcher
|
||||||
|
|
|
@ -29,22 +29,22 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
|
|
||||||
btoken := bearerToken(ctx)
|
btoken := bearerToken(ctx)
|
||||||
|
|
||||||
prm := PrmObjectRead{
|
prm := PrmObjectHead{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: btoken,
|
BearerToken: btoken,
|
||||||
},
|
},
|
||||||
Address: objectAddress,
|
Address: objectAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := h.frostfs.ReadObject(ctx, prm)
|
obj, err := h.frostfs.HeadObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range obj.Head.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
val := attr.Value()
|
val := attr.Value()
|
||||||
if !isValidToken(key) || !isValidValue(val) {
|
if !isValidToken(key) || !isValidValue(val) {
|
||||||
|
@ -70,11 +70,11 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&req.Response, obj.Head)
|
idsToResponse(&req.Response, obj)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||||
prmRange := PrmObjectRead{
|
prmRange := PrmObjectRange{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: btoken,
|
BearerToken: btoken,
|
||||||
},
|
},
|
||||||
|
@ -82,11 +82,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
PayloadRange: [2]uint64{0, sz},
|
PayloadRange: [2]uint64{0, sz},
|
||||||
}
|
}
|
||||||
|
|
||||||
resObj, err := h.frostfs.ReadObject(ctx, prmRange)
|
return h.frostfs.RangeObject(ctx, prmRange)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resObj.Payload, nil
|
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
|
|
|
@ -55,14 +55,14 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
filename string
|
filename string
|
||||||
)
|
)
|
||||||
|
|
||||||
prm := PrmObjectRead{
|
prm := PrmObjectGet{
|
||||||
PrmAuth: PrmAuth{
|
PrmAuth: PrmAuth{
|
||||||
BearerToken: bearerToken(ctx),
|
BearerToken: bearerToken(ctx),
|
||||||
},
|
},
|
||||||
Address: objectAddress,
|
Address: objectAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
rObj, err := h.frostfs.ReadObject(ctx, prm)
|
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
|
@ -74,11 +74,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
dis = "attachment"
|
dis = "attachment"
|
||||||
}
|
}
|
||||||
|
|
||||||
payloadSize := rObj.Head.PayloadSize()
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
var contentType string
|
var contentType string
|
||||||
for _, attr := range rObj.Head.Attributes() {
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
key := attr.Key()
|
key := attr.Key()
|
||||||
val := attr.Value()
|
val := attr.Value()
|
||||||
if !isValidToken(key) || !isValidValue(val) {
|
if !isValidToken(key) || !isValidValue(val) {
|
||||||
|
@ -107,7 +107,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idsToResponse(&req.Response, rObj.Head)
|
idsToResponse(&req.Response, &rObj.Header)
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
if len(contentType) == 0 {
|
||||||
// determine the Content-Type from the payload head
|
// determine the Content-Type from the payload head
|
||||||
|
|
Loading…
Reference in a new issue