From f8ae6761ce8a9d239cab100dd8203f60487f6c7d Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 15 Jul 2024 16:35:08 +0300 Subject: [PATCH] [#127] Split FrostFS ReadObject to separate methods Signed-off-by: Denis Kirillov --- internal/frostfs/frostfs.go | 56 ++++++++++++++++++-------------- internal/handler/download.go | 6 ++-- internal/handler/frostfs_mock.go | 51 +++++++++++++++++++---------- internal/handler/handler.go | 34 +++++++++++++++---- internal/handler/head.go | 20 +++++------- internal/handler/reader.go | 10 +++--- 6 files changed, 109 insertions(+), 68 deletions(-) diff --git a/internal/frostfs/frostfs.go b/internal/frostfs/frostfs.go index f3dfd7d..e844c95 100644 --- a/internal/frostfs/frostfs.go +++ b/internal/frostfs/frostfs.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "google.golang.org/grpc/codes" @@ -76,8 +77,25 @@ func (x payloadReader) Read(p []byte) (int, error) { return n, handleObjectError("read payload", err) } -// ReadObject implements frostfs.FrostFS interface method. -func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) { +// HeadObject implements frostfs.FrostFS interface method. +func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) { + var prmHead pool.PrmObjectHead + prmHead.SetAddress(prm.Address) + + if prm.BearerToken != nil { + prmHead.UseBearer(*prm.BearerToken) + } + + res, err := x.pool.HeadObject(ctx, prmHead) + if err != nil { + return nil, handleObjectError("read object header via connection pool", err) + } + + return &res, nil +} + +// GetObject implements frostfs.FrostFS interface method. +func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) { var prmGet pool.PrmObjectGet prmGet.SetAddress(prm.Address) @@ -85,27 +103,19 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h prmGet.UseBearer(*prm.BearerToken) } - // The code below must be reworked. It was copied from frostfs-s3-gw - // to create similar mocks for unit and fuzzing tests. - // - // However, this code was changed due to specific of expected responses - // from HTTP gateway. HTTP Gateway requires two types of responses: - // * payload as io.Reader + HEAD request - // * only payload as io.Reader - // Therefore all unused params were deleted and code was simplified. - - if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 { - res, err := x.pool.GetObject(ctx, prmGet) - if err != nil { - return nil, handleObjectError("init full payload range reading via connection pool", err) - } - - return &handler.ObjectPart{ - Payload: res.Payload, - Head: &res.Header, - }, nil + res, err := x.pool.GetObject(ctx, prmGet) + if err != nil { + return nil, handleObjectError("init full object reading via connection pool", err) } + return &handler.Object{ + Header: res.Header, + Payload: res.Payload, + }, nil +} + +// RangeObject implements frostfs.FrostFS interface method. +func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) { var prmRange pool.PrmObjectRange prmRange.SetAddress(prm.Address) prmRange.SetOffset(prm.PayloadRange[0]) @@ -120,9 +130,7 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*h return nil, handleObjectError("init payload range reading via connection pool", err) } - return &handler.ObjectPart{ - Payload: payloadReader{&res}, - }, nil + return payloadReader{&res}, nil } // SearchObjects implements frostfs.FrostFS interface method. diff --git a/internal/handler/download.go b/internal/handler/download.go index 480254c..88109a6 100644 --- a/internal/handler/download.go +++ b/internal/handler/download.go @@ -153,19 +153,19 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { } func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { - prm := PrmObjectRead{ + prm := PrmObjectGet{ PrmAuth: PrmAuth{ BearerToken: btoken, }, Address: addr, } - resGet, err := h.frostfs.ReadObject(ctx, prm) + resGet, err := h.frostfs.GetObject(ctx, prm) if err != nil { return fmt.Errorf("get FrostFS object: %v", err) } - objWriter, err := h.addObjectToZip(zipWriter, resGet.Head) + objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header) if err != nil { return fmt.Errorf("zip create header: %v", err) } diff --git a/internal/handler/frostfs_mock.go b/internal/handler/frostfs_mock.go index eb59fb6..9f4378a 100644 --- a/internal/handler/frostfs_mock.go +++ b/internal/handler/frostfs_mock.go @@ -78,32 +78,49 @@ func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID { return owner } -func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) { - sAddr := prm.Address.EncodeToString() +func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*object.Object, error) { + sAddr := addr.EncodeToString() if obj, ok := t.objects[sAddr]; ok { - owner := t.requestOwner(prm.BearerToken) + owner := t.requestOwner(btoken) - if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) { + if !t.isAllowed(addr.Container(), owner, acl.OpObjectGet, addr.Object()) { return nil, ErrAccessDenied } - payload := obj.Payload() - - if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 { - off := prm.PayloadRange[0] - payload = payload[off : off+prm.PayloadRange[1]] - obj = nil // GetRange does not return header values - } - - return &ObjectPart{ - Head: obj, - Payload: io.NopCloser(bytes.NewReader(payload)), - }, nil + return obj, nil } - return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address) + return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr) } + +func (t *TestFrostFS) HeadObject(_ context.Context, prm PrmObjectHead) (*object.Object, error) { + return t.retrieveObject(prm.Address, prm.BearerToken) +} + +func (t *TestFrostFS) GetObject(_ context.Context, prm PrmObjectGet) (*Object, error) { + obj, err := t.retrieveObject(prm.Address, prm.BearerToken) + if err != nil { + return nil, err + } + + return &Object{ + Header: *obj, + Payload: io.NopCloser(bytes.NewReader(obj.Payload())), + }, nil +} + +func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.ReadCloser, error) { + obj, err := t.retrieveObject(prm.Address, prm.BearerToken) + if err != nil { + return nil, err + } + + off := prm.PayloadRange[0] + payload := obj.Payload()[off : off+prm.PayloadRange[1]] + return io.NopCloser(bytes.NewReader(payload)), nil +} + func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { b := make([]byte, 32) if _, err := io.ReadFull(rand.Reader, b); err != nil { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 62b4e1c..4de9d9a 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -46,8 +46,26 @@ type PrmAuth struct { BearerToken *bearer.Token } -// PrmObjectRead groups parameters of FrostFS.ReadObject operation. -type PrmObjectRead struct { +// PrmObjectHead groups parameters of FrostFS.HeadObject operation. +type PrmObjectHead struct { + // Authentication parameters. + PrmAuth + + // Address to read the object header from. + Address oid.Address +} + +// PrmObjectGet groups parameters of FrostFS.GetObject operation. +type PrmObjectGet struct { + // Authentication parameters. + PrmAuth + + // Address to read the object header from. + Address oid.Address +} + +// PrmObjectRange groups parameters of FrostFS.RangeObject operation. +type PrmObjectRange struct { // Authentication parameters. PrmAuth @@ -58,10 +76,10 @@ type PrmObjectRead struct { PayloadRange [2]uint64 } -// ObjectPart represents partially read FrostFS object. -type ObjectPart struct { - // Object header with optional in-memory payload part. - Head *object.Object +// Object represents FrostFS object. +type Object struct { + // Object header (doesn't contain payload). + Header object.Object // Object payload part encapsulated in io.Reader primitive. // Returns ErrAccessDenied on read access violation. @@ -115,7 +133,9 @@ var ( // FrostFS represents virtual connection to FrostFS network. type FrostFS interface { Container(context.Context, PrmContainer) (*container.Container, error) - ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error) + HeadObject(context.Context, PrmObjectHead) (*object.Object, error) + GetObject(context.Context, PrmObjectGet) (*Object, error) + RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) utils.EpochInfoFetcher diff --git a/internal/handler/head.go b/internal/handler/head.go index 96d1f49..f0a1e94 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -29,22 +29,22 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid btoken := bearerToken(ctx) - prm := PrmObjectRead{ + prm := PrmObjectHead{ PrmAuth: PrmAuth{ BearerToken: btoken, }, Address: objectAddress, } - obj, err := h.frostfs.ReadObject(ctx, prm) + obj, err := h.frostfs.HeadObject(ctx, prm) if err != nil { req.handleFrostFSErr(err, start) return } - req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10)) + req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10)) var contentType string - for _, attr := range obj.Head.Attributes() { + for _, attr := range obj.Attributes() { key := attr.Key() val := attr.Value() if !isValidToken(key) || !isValidValue(val) { @@ -70,11 +70,11 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid } } - idsToResponse(&req.Response, obj.Head) + idsToResponse(&req.Response, obj) if len(contentType) == 0 { - contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) { - prmRange := PrmObjectRead{ + contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) { + prmRange := PrmObjectRange{ PrmAuth: PrmAuth{ BearerToken: btoken, }, @@ -82,11 +82,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid PayloadRange: [2]uint64{0, sz}, } - resObj, err := h.frostfs.ReadObject(ctx, prmRange) - if err != nil { - return nil, err - } - return resObj.Payload, nil + return h.frostfs.RangeObject(ctx, prmRange) }) if err != nil && err != io.EOF { req.handleFrostFSErr(err, start) diff --git a/internal/handler/reader.go b/internal/handler/reader.go index 81694bc..65d258b 100644 --- a/internal/handler/reader.go +++ b/internal/handler/reader.go @@ -55,14 +55,14 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi filename string ) - prm := PrmObjectRead{ + prm := PrmObjectGet{ PrmAuth: PrmAuth{ BearerToken: bearerToken(ctx), }, Address: objectAddress, } - rObj, err := h.frostfs.ReadObject(ctx, prm) + rObj, err := h.frostfs.GetObject(ctx, prm) if err != nil { req.handleFrostFSErr(err, start) return @@ -74,11 +74,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi dis = "attachment" } - payloadSize := rObj.Head.PayloadSize() + payloadSize := rObj.Header.PayloadSize() req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) var contentType string - for _, attr := range rObj.Head.Attributes() { + for _, attr := range rObj.Header.Attributes() { key := attr.Key() val := attr.Value() if !isValidToken(key) || !isValidValue(val) { @@ -107,7 +107,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi } } - idsToResponse(&req.Response, rObj.Head) + idsToResponse(&req.Response, &rObj.Header) if len(contentType) == 0 { // determine the Content-Type from the payload head