[#142] Fix multipart-objects download
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
77eb474581
commit
2eefa7c08b
13 changed files with 628 additions and 85 deletions
|
@ -61,7 +61,9 @@ type PrmObjectGet struct {
|
|||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
Address oid.Address
|
||||
Container cid.ID
|
||||
Object oid.ID
|
||||
}
|
||||
|
||||
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||
|
@ -74,6 +76,10 @@ type PrmObjectRange struct {
|
|||
|
||||
// Offset-length range of the object payload to be read.
|
||||
PayloadRange [2]uint64
|
||||
|
||||
Container cid.ID
|
||||
|
||||
Object oid.ID
|
||||
}
|
||||
|
||||
// Object represents FrostFS object.
|
||||
|
@ -177,7 +183,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
|||
|
||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
var (
|
||||
idCnr, _ = c.UserValue("cid").(string)
|
||||
idObj, _ = c.UserValue("oid").(string)
|
||||
|
@ -203,12 +209,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
f(ctx, *h.newRequest(c, log), addr, bktInfo)
|
||||
}
|
||||
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
var (
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
|
@ -250,11 +256,11 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(req, log), addr)
|
||||
f(ctx, *h.newRequest(req, log), addr, bktInfo)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
val, _ := c.UserValue("attr_val").(string)
|
||||
|
@ -311,7 +317,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
addrObj.SetContainer(bktInfo.CID)
|
||||
addrObj.SetObject(buf[0])
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addrObj)
|
||||
f(ctx, *h.newRequest(c, log), addrObj, bktInfo)
|
||||
}
|
||||
|
||||
// resolveContainer decode container id, if it's not a valid container id
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -24,7 +25,7 @@ const (
|
|||
hdrContainerID = "X-Container-Id"
|
||||
)
|
||||
|
||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address, _ *data.BucketInfo) {
|
||||
var start = time.Now()
|
||||
|
||||
btoken := bearerToken(ctx)
|
||||
|
|
|
@ -1,13 +1,26 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
frostFSSystemMetadataPrefix = "S3-"
|
||||
attributeMultipartObjectSize = frostFSSystemMetadataPrefix + "Multipart-Object-Size"
|
||||
)
|
||||
|
||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||
// get file name, it's used for multipart uploads.
|
||||
type MultipartFile interface {
|
||||
|
@ -45,3 +58,119 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
return part, nil
|
||||
}
|
||||
}
|
||||
|
||||
type getParams struct {
|
||||
// payload range
|
||||
off, ln uint64
|
||||
|
||||
objInfo *data.ObjectInfo
|
||||
bktInfo *data.BucketInfo
|
||||
}
|
||||
|
||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
||||
sizeValue, ok := p.attrs[attributeMultipartObjectSize]
|
||||
if !ok {
|
||||
return p.obj.Payload, p.obj.Header.PayloadSize(), nil
|
||||
}
|
||||
cid, ok := p.obj.Header.ContainerID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no container id set")
|
||||
}
|
||||
oid, ok := p.obj.Header.ID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no object id set")
|
||||
}
|
||||
size, err := strconv.ParseUint(sizeValue, 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ctx := p.req.RequestCtx
|
||||
params := getParams{
|
||||
off: 0,
|
||||
ln: 0,
|
||||
objInfo: &data.ObjectInfo{
|
||||
ID: oid,
|
||||
CID: cid,
|
||||
Bucket: p.bktinfo.Name,
|
||||
Name: p.attrs["FilePath"],
|
||||
Size: size,
|
||||
Headers: p.attrs,
|
||||
},
|
||||
bktInfo: p.bktinfo,
|
||||
}
|
||||
payload, err := h.initMultipartReader(ctx, params)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return io.NopCloser(payload), size, nil
|
||||
}
|
||||
|
||||
func (h *Handler) initMultipartReader(ctx context.Context, p getParams) (io.Reader, error) {
|
||||
combinedObj, err := h.frostfs.GetObject(ctx, PrmObjectGet{
|
||||
PrmAuth: PrmAuth{BearerToken: bearerToken(ctx)},
|
||||
Address: p.objInfo.Address(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
||||
objParts := make([]frostfs.PartObj, len(parts))
|
||||
for i, part := range parts {
|
||||
objParts[i] = frostfs.PartObj{
|
||||
OID: part.OID,
|
||||
Size: part.Size,
|
||||
}
|
||||
}
|
||||
|
||||
return frostfs.NewMultiObjectReader(ctx, frostfs.MultiObjectReaderConfig{
|
||||
Handler: h,
|
||||
Off: p.off,
|
||||
Ln: p.ln,
|
||||
Parts: objParts,
|
||||
BktInfo: p.bktInfo,
|
||||
Log: h.log,
|
||||
})
|
||||
}
|
||||
|
||||
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||||
// Zero range corresponds to full payload (panics if only offset is set).
|
||||
func (h *Handler) InitFrostFSObjectPayloadReader(ctx context.Context, p frostfs.GetFrostFSParams) (io.Reader, error) {
|
||||
var prmAuth PrmAuth
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(p.BktInfo.CID)
|
||||
addr.SetObject(p.Oid)
|
||||
|
||||
if p.Off+p.Ln != 0 {
|
||||
prm := PrmObjectRange{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.BktInfo.CID,
|
||||
Object: p.Oid,
|
||||
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||
Address: addr,
|
||||
}
|
||||
|
||||
return h.frostfs.RangeObject(ctx, prm)
|
||||
}
|
||||
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.BktInfo.CID,
|
||||
Object: p.Oid,
|
||||
Address: addr,
|
||||
}
|
||||
|
||||
res, err := h.frostfs.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Payload, nil
|
||||
}
|
||||
|
|
|
@ -5,10 +5,10 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
|
@ -47,19 +47,24 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
|||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
type getPayloadParams struct {
|
||||
obj *Object
|
||||
req request
|
||||
bktinfo *data.BucketInfo
|
||||
attrs map[string]string
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) {
|
||||
var (
|
||||
err error
|
||||
dis = "inline"
|
||||
start = time.Now()
|
||||
filename string
|
||||
shouldDownload = req.QueryArgs().GetBool("download")
|
||||
start = time.Now()
|
||||
)
|
||||
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Address: objectAddress,
|
||||
Address: objAddress,
|
||||
}
|
||||
|
||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||
|
@ -70,51 +75,40 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
|
||||
// we can't close reader in this function, so how to do it?
|
||||
|
||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||
dis = "attachment"
|
||||
attrs := makeAttributesMap(rObj.Header.Attributes())
|
||||
req.setAttributes(attrs)
|
||||
|
||||
req.setIDs(rObj.Header)
|
||||
req.setDisposition(shouldDownload, attrs[object.AttributeFileName])
|
||||
|
||||
if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", attrs[object.AttributeTimestamp]),
|
||||
zap.Error(err))
|
||||
response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError)
|
||||
}
|
||||
|
||||
payloadSize := rObj.Header.PayloadSize()
|
||||
payloadParams := getPayloadParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
bktinfo: bktInfo,
|
||||
attrs: attrs,
|
||||
}
|
||||
|
||||
payload, payloadSize, err := h.getPayload(payloadParams)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
var contentType string
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
if !isValidToken(key) || !isValidValue(val) {
|
||||
continue
|
||||
}
|
||||
|
||||
key = utils.BackwardTransformIfSystem(key)
|
||||
|
||||
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
switch key {
|
||||
case object.AttributeFileName:
|
||||
filename = val
|
||||
case object.AttributeTimestamp:
|
||||
value, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
req.log.Info(logs.CouldntParseCreationDate,
|
||||
zap.String("key", key),
|
||||
zap.String("val", val),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
}
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, &rObj.Header)
|
||||
|
||||
contentType := attrs[object.AttributeContentType]
|
||||
if len(contentType) == 0 {
|
||||
// determine the Content-Type from the payload head
|
||||
var payloadHead []byte
|
||||
|
||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||
return rObj.Payload, nil
|
||||
return payload, nil
|
||||
})
|
||||
if err != nil && err != io.EOF {
|
||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||
|
@ -126,16 +120,27 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||
|
||||
if err != io.EOF { // otherwise, we've already read full payload
|
||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
||||
headReader = io.MultiReader(headReader, payload)
|
||||
}
|
||||
|
||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||
// if it implements io.Closer and that's useful for us.
|
||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||
payload = readCloser{headReader, payload}
|
||||
}
|
||||
req.SetContentType(contentType)
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
|
||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||
}
|
||||
|
||||
func makeAttributesMap(attrs []object.Attribute) map[string]string {
|
||||
attributes := make(map[string]string)
|
||||
for _, attr := range attrs {
|
||||
if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) {
|
||||
continue
|
||||
}
|
||||
key := utils.BackwardTransformIfSystem(attr.Key())
|
||||
|
||||
attributes[key] = attr.Value()
|
||||
}
|
||||
return attributes
|
||||
}
|
||||
|
|
|
@ -2,14 +2,19 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -31,6 +36,43 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|||
response.Error(r.RequestCtx, msg, statusCode)
|
||||
}
|
||||
|
||||
func (r *request) setAttributes(attrs map[string]string) {
|
||||
for key, val := range attrs {
|
||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
)
|
||||
dis := inlineDisposition
|
||||
if shouldDownload {
|
||||
dis = attachmentDisposition
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||
return tkn
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue