From 263af037c1e031e44e58caf67f33689e5d42cf11 Mon Sep 17 00:00:00 2001 From: Nikita Zinkevich Date: Thu, 3 Oct 2024 12:28:29 +0300 Subject: [PATCH] [#142] Fix multipart-objects download Signed-off-by: Nikita Zinkevich --- internal/handler/frostfs_mock.go | 4 + internal/handler/handler.go | 12 +++ internal/handler/multipart.go | 95 ++----------------- .../service/frostfs/multi_object_reader.go | 76 ++++++++++++++- .../frostfs/multi_object_reader_test.go | 8 +- 5 files changed, 101 insertions(+), 94 deletions(-) diff --git a/internal/handler/frostfs_mock.go b/internal/handler/frostfs_mock.go index 9f4378a..47a5d86 100644 --- a/internal/handler/frostfs_mock.go +++ b/internal/handler/frostfs_mock.go @@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res return &resObjectSearchMock{res: res}, nil } +func (t *TestFrostFS) InitMultiObjectReader(_ context.Context, prm PrmInitMultiObjectReader) (io.Reader, error) { + return nil, nil +} + func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { for _, attr := range attributes { if attr.Key() == filter.Header() { diff --git a/internal/handler/handler.go b/internal/handler/handler.go index a5d7c95..e9d7545 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -123,6 +123,16 @@ type PrmObjectSearch struct { Filters object.SearchFilters } +type PrmInitMultiObjectReader struct { + // payload range + Off, Ln uint64 + + ObjInfo *data.ObjectInfo + BktInfo *data.BucketInfo + Log *zap.Logger + Bearer *bearer.Token +} + type ResObjectSearch interface { Read(buf []oid.ID) (int, error) Iterate(f func(oid.ID) bool) error @@ -144,6 +154,8 @@ type FrostFS interface { RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error) CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error) + InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error) + utils.EpochInfoFetcher } diff --git a/internal/handler/multipart.go b/internal/handler/multipart.go index e1f651e..d470e14 100644 --- a/internal/handler/multipart.go +++ b/internal/handler/multipart.go @@ -1,18 +1,13 @@ package handler import ( - "context" - "encoding/json" "errors" - "fmt" "io" "strconv" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -59,14 +54,6 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF } } -type getParams struct { - // payload range - off, ln uint64 - - objInfo *data.ObjectInfo - bktInfo *data.BucketInfo -} - // getPayload returns initial payload if object is not multipart else composes new reader with parts data. func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) { sizeValue, ok := p.attrs[attributeMultipartObjectSize] @@ -86,10 +73,10 @@ func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) return nil, 0, err } ctx := p.req.RequestCtx - params := getParams{ - off: 0, - ln: 0, - objInfo: &data.ObjectInfo{ + params := PrmInitMultiObjectReader{ + Off: 0, + Ln: 0, + ObjInfo: &data.ObjectInfo{ ID: oid, CID: cid, Bucket: p.bktinfo.Name, @@ -97,80 +84,14 @@ func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) Size: size, Headers: p.attrs, }, - bktInfo: p.bktinfo, + BktInfo: p.bktinfo, + Log: h.log, + Bearer: bearerToken(ctx), } - payload, err := h.initMultipartReader(ctx, params) + payload, err := h.frostfs.InitMultiObjectReader(ctx, params) if err != nil { return nil, 0, err } return io.NopCloser(payload), size, nil } - -func (h *Handler) initMultipartReader(ctx context.Context, p getParams) (io.Reader, error) { - combinedObj, err := h.frostfs.GetObject(ctx, PrmObjectGet{ - PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)}, - Address: p.objInfo.Address(), - }) - if err != nil { - return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err) - } - - var parts []*data.PartInfo - if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { - return nil, fmt.Errorf("unmarshal combined object parts: %w", err) - } - - objParts := make([]frostfs.PartObj, len(parts)) - for i, part := range parts { - objParts[i] = frostfs.PartObj{ - OID: part.OID, - Size: part.Size, - } - } - - return frostfs.NewMultiObjectReader(ctx, frostfs.MultiObjectReaderConfig{ - Handler: h, - Off: p.off, - Ln: p.ln, - Parts: objParts, - BktInfo: p.bktInfo, - Log: h.log, - }) -} - -// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object. -// Zero range corresponds to full payload (panics if only offset is set). -func (h *Handler) InitFrostFSObjectPayloadReader(ctx context.Context, p frostfs.GetFrostFSParams) (io.Reader, error) { - var prmAuth PrmAuth - - var addr oid.Address - addr.SetContainer(p.BktInfo.CID) - addr.SetObject(p.Oid) - - if p.Off+p.Ln != 0 { - prm := PrmObjectRange{ - PrmAuth: prmAuth, - Container: p.BktInfo.CID, - Object: p.Oid, - PayloadRange: [2]uint64{p.Off, p.Ln}, - Address: addr, - } - - return h.frostfs.RangeObject(ctx, prm) - } - - prm := PrmObjectGet{ - PrmAuth: prmAuth, - Container: p.BktInfo.CID, - Object: p.Oid, - Address: addr, - } - - res, err := h.frostfs.GetObject(ctx, prm) - if err != nil { - return nil, err - } - - return res.Payload, nil -} diff --git a/internal/service/frostfs/multi_object_reader.go b/internal/service/frostfs/multi_object_reader.go index a3cda9d..9aa064e 100644 --- a/internal/service/frostfs/multi_object_reader.go +++ b/internal/service/frostfs/multi_object_reader.go @@ -2,11 +2,13 @@ package frostfs import ( "context" + "encoding/json" "errors" "fmt" "io" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -47,8 +49,8 @@ type MultiObjectReader struct { } type MultiObjectReaderConfig struct { - Handler readerInitiator - Log *zap.Logger + Initiator readerInitiator + Log *zap.Logger // the offset of complete object and total size to read Off, Ln uint64 @@ -64,6 +66,38 @@ var ( errorZeroRangeLength = errors.New("zero range length") ) +func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) { + combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{ + PrmAuth: handler.PrmAuth{BearerToken: p.Bearer}, + Address: p.ObjInfo.Address(), + }) + if err != nil { + return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { + return nil, fmt.Errorf("unmarshal combined object parts: %w", err) + } + + objParts := make([]PartObj, len(parts)) + for i, part := range parts { + objParts[i] = PartObj{ + OID: part.OID, + Size: part.Size, + } + } + + return NewMultiObjectReader(ctx, MultiObjectReaderConfig{ + Initiator: x, + Off: p.Off, + Ln: p.Ln, + Parts: objParts, + BktInfo: p.BktInfo, + Log: p.Log, + }) +} + func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) { if len(cfg.Parts) == 0 { return nil, errEmptyPartsList @@ -71,7 +105,7 @@ func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*Mu r := &MultiObjectReader{ ctx: ctx, - layer: cfg.Handler, + layer: cfg.Initiator, prm: GetFrostFSParams{ BktInfo: cfg.BktInfo, }, @@ -166,3 +200,39 @@ func (x *MultiObjectReader) Read(p []byte) (n int, err error) { return n + next, err } + +// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object. +// Zero range corresponds to full payload (panics if only offset is set). +func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) { + var prmAuth handler.PrmAuth + + var addr oid.Address + addr.SetContainer(p.BktInfo.CID) + addr.SetObject(p.Oid) + + if p.Off+p.Ln != 0 { + prm := handler.PrmObjectRange{ + PrmAuth: prmAuth, + Container: p.BktInfo.CID, + Object: p.Oid, + PayloadRange: [2]uint64{p.Off, p.Ln}, + Address: addr, + } + + return x.RangeObject(ctx, prm) + } + + prm := handler.PrmObjectGet{ + PrmAuth: prmAuth, + Container: p.BktInfo.CID, + Object: p.Oid, + Address: addr, + } + + res, err := x.GetObject(ctx, prm) + if err != nil { + return nil, err + } + + return res.Payload, nil +} diff --git a/internal/service/frostfs/multi_object_reader_test.go b/internal/service/frostfs/multi_object_reader_test.go index f1bf2bf..6b333c7 100644 --- a/internal/service/frostfs/multi_object_reader_test.go +++ b/internal/service/frostfs/multi_object_reader_test.go @@ -115,10 +115,10 @@ func TestMultiReader(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{ - Handler: mockInitReader, - Parts: parts, - Off: tc.off, - Ln: tc.ln, + Initiator: mockInitReader, + Parts: parts, + Off: tc.off, + Ln: tc.ln, }) require.ErrorIs(t, err, tc.err)