forked from TrueCloudLab/frostfs-http-gw
[#142] Fix multipart-objects download
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
8fe8f2dcc2
commit
495f745535
11 changed files with 517 additions and 62 deletions
|
@ -229,6 +229,10 @@ func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (Res
|
|||
return &resObjectSearchMock{res: res}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) InitMultiObjectReader(context.Context, PrmInitMultiObjectReader) (io.Reader, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||
for _, attr := range attributes {
|
||||
if attr.Key() == filter.Header() {
|
||||
|
@ -269,10 +273,3 @@ func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID o
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
||||
|
|
|
@ -119,6 +119,14 @@ type PrmObjectSearch struct {
|
|||
Filters object.SearchFilters
|
||||
}
|
||||
|
||||
type PrmInitMultiObjectReader struct {
|
||||
// payload range
|
||||
Off, Ln uint64
|
||||
|
||||
Addr oid.Address
|
||||
Bearer *bearer.Token
|
||||
}
|
||||
|
||||
type ResObjectSearch interface {
|
||||
Read(buf []oid.ID) (int, error)
|
||||
Iterate(f func(oid.ID) bool) error
|
||||
|
@ -140,6 +148,8 @@ type FrostFS interface {
|
|||
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||
InitMultiObjectReader(ctx context.Context, p PrmInitMultiObjectReader) (io.Reader, error)
|
||||
|
||||
utils.EpochInfoFetcher
|
||||
}
|
||||
|
||||
|
@ -201,9 +211,7 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
addr := newAddress(bktInfo.CID, *objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
@ -256,10 +264,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
addr := newAddress(bktInfo.CID, foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const attributeMultipartObjectSize = "S3-Multipart-Object-Size"
|
||||
|
||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||
// get file name, it's used for multipart uploads.
|
||||
type MultipartFile interface {
|
||||
|
@ -45,3 +49,30 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
return part, nil
|
||||
}
|
||||
}
|
||||
|
||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||
cid, ok := p.obj.Header.ContainerID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no container id set")
|
||||
}
|
||||
oid, ok := p.obj.Header.ID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no object id set")
|
||||
}
|
||||
size, err := strconv.ParseUint(p.strSize, 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ctx := p.req.RequestCtx
|
||||
params := PrmInitMultiObjectReader{
|
||||
Addr: newAddress(cid, oid),
|
||||
Bearer: bearerToken(ctx),
|
||||
}
|
||||
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return io.NopCloser(payload), size, nil
|
||||
}
|
||||
|
|
|
@ -47,20 +47,26 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
|||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
type getMultiobjectBodyParams struct {
|
||||
obj *Object
|
||||
req request
|
||||
strSize string
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address) {
|
||||
var (
|
||||
err error
|
||||
dis = "inline"
|
||||
start = time.Now()
|
||||
filename string
|
||||
filepath string
|
||||
shouldDownload = req.QueryArgs().GetBool("download")
|
||||
start = time.Now()
|
||||
filename string
|
||||
filepath string
|
||||
contentType string
|
||||
)
|
||||
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Address: objectAddress,
|
||||
Address: objAddress,
|
||||
}
|
||||
|
||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||
|
@ -70,15 +76,9 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
}
|
||||
|
||||
// we can't close reader in this function, so how to do it?
|
||||
|
||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||
dis = "attachment"
|
||||
}
|
||||
|
||||
req.setIDs(rObj.Header)
|
||||
payload := rObj.Payload
|
||||
payloadSize := rObj.Header.PayloadSize()
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
var contentType string
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
|
@ -93,31 +93,41 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
case object.AttributeFileName:
|
||||
filename = val
|
||||
case object.AttributeTimestamp:
|
||||
value, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
req.log.Info(logs.CouldntParseCreationDate,
|
||||
zap.String("key", key),
|
||||
if err = req.setTimestamp(val); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", val),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
case object.AttributeFilePath:
|
||||
filepath = val
|
||||
case attributeMultipartObjectSize:
|
||||
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
strSize: val,
|
||||
})
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, &rObj.Header)
|
||||
req.setDisposition(shouldDownload, filename)
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
|
||||
if len(contentType) == 0 {
|
||||
// determine the Content-Type from the payload head
|
||||
var payloadHead []byte
|
||||
|
||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||
return rObj.Payload, nil
|
||||
return payload, nil
|
||||
})
|
||||
if err != nil && err != io.EOF {
|
||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||
|
@ -129,20 +139,46 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||
|
||||
if err != io.EOF { // otherwise, we've already read full payload
|
||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
||||
headReader = io.MultiReader(headReader, payload)
|
||||
}
|
||||
|
||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||
// if it implements io.Closer and that's useful for us.
|
||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||
payload = readCloser{headReader, payload}
|
||||
}
|
||||
req.SetContentType(contentType)
|
||||
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||
}
|
||||
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
)
|
||||
|
||||
dis := inlineDisposition
|
||||
if shouldDownload {
|
||||
dis = attachmentDisposition
|
||||
}
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
|
||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -90,3 +92,10 @@ func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
|||
}
|
||||
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue