[#142] Fix multipart-objects download
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
2eefa7c08b
commit
263af037c1
5 changed files with 101 additions and 94 deletions
|
@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res
|
||||||
return &resObjectSearchMock{res: res}, nil
|
return &resObjectSearchMock{res: res}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) InitMultiObjectReader(_ context.Context, prm PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
for _, attr := range attributes {
|
for _, attr := range attributes {
|
||||||
if attr.Key() == filter.Header() {
|
if attr.Key() == filter.Header() {
|
||||||
|
|
|
@ -123,6 +123,16 @@ type PrmObjectSearch struct {
|
||||||
Filters object.SearchFilters
|
Filters object.SearchFilters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PrmInitMultiObjectReader struct {
|
||||||
|
// payload range
|
||||||
|
Off, Ln uint64
|
||||||
|
|
||||||
|
ObjInfo *data.ObjectInfo
|
||||||
|
BktInfo *data.BucketInfo
|
||||||
|
Log *zap.Logger
|
||||||
|
Bearer *bearer.Token
|
||||||
|
}
|
||||||
|
|
||||||
type ResObjectSearch interface {
|
type ResObjectSearch interface {
|
||||||
Read(buf []oid.ID) (int, error)
|
Read(buf []oid.ID) (int, error)
|
||||||
Iterate(f func(oid.ID) bool) error
|
Iterate(f func(oid.ID) bool) error
|
||||||
|
@ -144,6 +154,8 @@ type FrostFS interface {
|
||||||
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||||
|
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
|
||||||
|
|
||||||
utils.EpochInfoFetcher
|
utils.EpochInfoFetcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,18 +1,13 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,14 +54,6 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type getParams struct {
|
|
||||||
// payload range
|
|
||||||
off, ln uint64
|
|
||||||
|
|
||||||
objInfo *data.ObjectInfo
|
|
||||||
bktInfo *data.BucketInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||||
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
||||||
sizeValue, ok := p.attrs[attributeMultipartObjectSize]
|
sizeValue, ok := p.attrs[attributeMultipartObjectSize]
|
||||||
|
@ -86,10 +73,10 @@ func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error)
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
ctx := p.req.RequestCtx
|
ctx := p.req.RequestCtx
|
||||||
params := getParams{
|
params := PrmInitMultiObjectReader{
|
||||||
off: 0,
|
Off: 0,
|
||||||
ln: 0,
|
Ln: 0,
|
||||||
objInfo: &data.ObjectInfo{
|
ObjInfo: &data.ObjectInfo{
|
||||||
ID: oid,
|
ID: oid,
|
||||||
CID: cid,
|
CID: cid,
|
||||||
Bucket: p.bktinfo.Name,
|
Bucket: p.bktinfo.Name,
|
||||||
|
@ -97,80 +84,14 @@ func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error)
|
||||||
Size: size,
|
Size: size,
|
||||||
Headers: p.attrs,
|
Headers: p.attrs,
|
||||||
},
|
},
|
||||||
bktInfo: p.bktinfo,
|
BktInfo: p.bktinfo,
|
||||||
|
Log: h.log,
|
||||||
|
Bearer: bearerToken(ctx),
|
||||||
}
|
}
|
||||||
payload, err := h.initMultipartReader(ctx, params)
|
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return io.NopCloser(payload), size, nil
|
return io.NopCloser(payload), size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) initMultipartReader(ctx context.Context, p getParams) (io.Reader, error) {
|
|
||||||
combinedObj, err := h.frostfs.GetObject(ctx, PrmObjectGet{
|
|
||||||
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
|
|
||||||
Address: p.objInfo.Address(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var parts []*data.PartInfo
|
|
||||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
|
||||||
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
objParts := make([]frostfs.PartObj, len(parts))
|
|
||||||
for i, part := range parts {
|
|
||||||
objParts[i] = frostfs.PartObj{
|
|
||||||
OID: part.OID,
|
|
||||||
Size: part.Size,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return frostfs.NewMultiObjectReader(ctx, frostfs.MultiObjectReaderConfig{
|
|
||||||
Handler: h,
|
|
||||||
Off: p.off,
|
|
||||||
Ln: p.ln,
|
|
||||||
Parts: objParts,
|
|
||||||
BktInfo: p.bktInfo,
|
|
||||||
Log: h.log,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
|
||||||
// Zero range corresponds to full payload (panics if only offset is set).
|
|
||||||
func (h *Handler) InitFrostFSObjectPayloadReader(ctx context.Context, p frostfs.GetFrostFSParams) (io.Reader, error) {
|
|
||||||
var prmAuth PrmAuth
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(p.BktInfo.CID)
|
|
||||||
addr.SetObject(p.Oid)
|
|
||||||
|
|
||||||
if p.Off+p.Ln != 0 {
|
|
||||||
prm := PrmObjectRange{
|
|
||||||
PrmAuth: prmAuth,
|
|
||||||
Container: p.BktInfo.CID,
|
|
||||||
Object: p.Oid,
|
|
||||||
PayloadRange: [2]uint64{p.Off, p.Ln},
|
|
||||||
Address: addr,
|
|
||||||
}
|
|
||||||
|
|
||||||
return h.frostfs.RangeObject(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
prm := PrmObjectGet{
|
|
||||||
PrmAuth: prmAuth,
|
|
||||||
Container: p.BktInfo.CID,
|
|
||||||
Object: p.Oid,
|
|
||||||
Address: addr,
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := h.frostfs.GetObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Payload, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,11 +2,13 @@ package frostfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -47,7 +49,7 @@ type MultiObjectReader struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type MultiObjectReaderConfig struct {
|
type MultiObjectReaderConfig struct {
|
||||||
Handler readerInitiator
|
Initiator readerInitiator
|
||||||
Log *zap.Logger
|
Log *zap.Logger
|
||||||
|
|
||||||
// the offset of complete object and total size to read
|
// the offset of complete object and total size to read
|
||||||
|
@ -64,6 +66,38 @@ var (
|
||||||
errorZeroRangeLength = errors.New("zero range length")
|
errorZeroRangeLength = errors.New("zero range length")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
|
||||||
|
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
|
||||||
|
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
|
||||||
|
Address: p.ObjInfo.Address(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parts []*data.PartInfo
|
||||||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objParts := make([]PartObj, len(parts))
|
||||||
|
for i, part := range parts {
|
||||||
|
objParts[i] = PartObj{
|
||||||
|
OID: part.OID,
|
||||||
|
Size: part.Size,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
|
Initiator: x,
|
||||||
|
Off: p.Off,
|
||||||
|
Ln: p.Ln,
|
||||||
|
Parts: objParts,
|
||||||
|
BktInfo: p.BktInfo,
|
||||||
|
Log: p.Log,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
||||||
if len(cfg.Parts) == 0 {
|
if len(cfg.Parts) == 0 {
|
||||||
return nil, errEmptyPartsList
|
return nil, errEmptyPartsList
|
||||||
|
@ -71,7 +105,7 @@ func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*Mu
|
||||||
|
|
||||||
r := &MultiObjectReader{
|
r := &MultiObjectReader{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
layer: cfg.Handler,
|
layer: cfg.Initiator,
|
||||||
prm: GetFrostFSParams{
|
prm: GetFrostFSParams{
|
||||||
BktInfo: cfg.BktInfo,
|
BktInfo: cfg.BktInfo,
|
||||||
},
|
},
|
||||||
|
@ -166,3 +200,39 @@ func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
||||||
|
|
||||||
return n + next, err
|
return n + next, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||||||
|
// Zero range corresponds to full payload (panics if only offset is set).
|
||||||
|
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||||
|
var prmAuth handler.PrmAuth
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(p.BktInfo.CID)
|
||||||
|
addr.SetObject(p.Oid)
|
||||||
|
|
||||||
|
if p.Off+p.Ln != 0 {
|
||||||
|
prm := handler.PrmObjectRange{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Object: p.Oid,
|
||||||
|
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||||
|
Address: addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.RangeObject(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := handler.PrmObjectGet{
|
||||||
|
PrmAuth: prmAuth,
|
||||||
|
Container: p.BktInfo.CID,
|
||||||
|
Object: p.Oid,
|
||||||
|
Address: addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := x.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Payload, nil
|
||||||
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ func TestMultiReader(t *testing.T) {
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||||
Handler: mockInitReader,
|
Initiator: mockInitReader,
|
||||||
Parts: parts,
|
Parts: parts,
|
||||||
Off: tc.off,
|
Off: tc.off,
|
||||||
Ln: tc.ln,
|
Ln: tc.ln,
|
||||||
|
|
Loading…
Add table
Reference in a new issue