Compare commits
1 commit
0b03f693a2
...
16548f1a30
Author | SHA1 | Date | |
---|---|---|---|
16548f1a30 |
8 changed files with 123 additions and 192 deletions
|
@ -1,41 +0,0 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type (
|
||||
ObjectInfo struct {
|
||||
ID oid.ID
|
||||
CID cid.ID
|
||||
|
||||
Bucket string
|
||||
Name string
|
||||
Size uint64
|
||||
Headers map[string]string
|
||||
}
|
||||
|
||||
// PartInfo is upload information about part.
|
||||
PartInfo struct {
|
||||
Key string `json:"key"`
|
||||
UploadID string `json:"uploadId"`
|
||||
Number int `json:"number"`
|
||||
OID oid.ID `json:"oid"`
|
||||
Size uint64 `json:"size"`
|
||||
ETag string `json:"etag"`
|
||||
MD5 string `json:"md5"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
)
|
||||
|
||||
// Address returns object address.
|
||||
func (o *ObjectInfo) Address() oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(o.CID)
|
||||
addr.SetObject(o.ID)
|
||||
|
||||
return addr
|
||||
}
|
|
@ -273,10 +273,3 @@ func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID o
|
|||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
||||
|
|
|
@ -123,10 +123,8 @@ type PrmInitMultiObjectReader struct {
|
|||
// payload range
|
||||
Off, Ln uint64
|
||||
|
||||
ObjInfo *data.ObjectInfo
|
||||
BktInfo *data.BucketInfo
|
||||
Log *zap.Logger
|
||||
Bearer *bearer.Token
|
||||
Addr oid.Address
|
||||
Bearer *bearer.Token
|
||||
}
|
||||
|
||||
type ResObjectSearch interface {
|
||||
|
@ -213,9 +211,7 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
addr := newAddress(bktInfo.CID, *objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr, bktInfo)
|
||||
}
|
||||
|
@ -268,10 +264,7 @@ func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, r
|
|||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
addr := newAddress(bktInfo.CID, foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr, bktInfo)
|
||||
}
|
||||
|
|
|
@ -5,16 +5,12 @@ import (
|
|||
"io"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
frostFSSystemMetadataPrefix = "S3-"
|
||||
attributeMultipartObjectSize = frostFSSystemMetadataPrefix + "Multipart-Object-Size"
|
||||
)
|
||||
const attributeMultipartObjectSize = "S3-Multipart-Object-Size"
|
||||
|
||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||
// get file name, it's used for multipart uploads.
|
||||
|
@ -55,11 +51,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
}
|
||||
|
||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
||||
sizeValue, ok := p.attrs[attributeMultipartObjectSize]
|
||||
if !ok {
|
||||
return p.obj.Payload, p.obj.Header.PayloadSize(), nil
|
||||
}
|
||||
func (h *Handler) getPayload(p getMultiobjectBodyParams) (io.ReadCloser, uint64, error) {
|
||||
cid, ok := p.obj.Header.ContainerID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no container id set")
|
||||
|
@ -68,25 +60,16 @@ func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error)
|
|||
if !ok {
|
||||
return nil, 0, errors.New("no object id set")
|
||||
}
|
||||
size, err := strconv.ParseUint(sizeValue, 10, 64)
|
||||
size, err := strconv.ParseUint(p.strSize, 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ctx := p.req.RequestCtx
|
||||
params := PrmInitMultiObjectReader{
|
||||
Off: 0,
|
||||
Ln: 0,
|
||||
ObjInfo: &data.ObjectInfo{
|
||||
ID: oid,
|
||||
CID: cid,
|
||||
Bucket: p.bktinfo.Name,
|
||||
Name: p.attrs["FilePath"],
|
||||
Size: size,
|
||||
Headers: p.attrs,
|
||||
},
|
||||
BktInfo: p.bktinfo,
|
||||
Log: h.log,
|
||||
Bearer: bearerToken(ctx),
|
||||
Off: 0,
|
||||
Ln: 0,
|
||||
Addr: newAddress(cid, oid),
|
||||
Bearer: bearerToken(ctx),
|
||||
}
|
||||
payload, err := h.frostfs.InitMultiObjectReader(ctx, params)
|
||||
if err != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -47,17 +48,22 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
|||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||
}
|
||||
|
||||
type getPayloadParams struct {
|
||||
type getMultiobjectBodyParams struct {
|
||||
obj *Object
|
||||
req request
|
||||
bktinfo *data.BucketInfo
|
||||
attrs map[string]string
|
||||
strSize string
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) {
|
||||
var (
|
||||
shouldDownload = req.QueryArgs().GetBool("download")
|
||||
start = time.Now()
|
||||
filename string
|
||||
filepath string
|
||||
contentType string
|
||||
payload io.ReadCloser
|
||||
payloadSize uint64
|
||||
)
|
||||
|
||||
prm := PrmObjectGet{
|
||||
|
@ -74,35 +80,52 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
}
|
||||
|
||||
// we can't close reader in this function, so how to do it?
|
||||
|
||||
attrs := makeAttributesMap(rObj.Header.Attributes())
|
||||
req.setAttributes(attrs)
|
||||
|
||||
req.setIDs(rObj.Header)
|
||||
req.setDisposition(shouldDownload, attrs)
|
||||
payload = rObj.Payload
|
||||
payloadSize = rObj.Header.PayloadSize()
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
if !isValidToken(key) || !isValidValue(val) {
|
||||
continue
|
||||
}
|
||||
|
||||
if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", attrs[object.AttributeTimestamp]),
|
||||
zap.Error(err))
|
||||
response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError)
|
||||
}
|
||||
key = utils.BackwardTransformIfSystem(key)
|
||||
|
||||
payloadParams := getPayloadParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
bktinfo: bktInfo,
|
||||
attrs: attrs,
|
||||
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
switch key {
|
||||
case object.AttributeFileName:
|
||||
filename = val
|
||||
case object.AttributeTimestamp:
|
||||
if err = req.setTimestamp(val); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", val),
|
||||
zap.Error(err))
|
||||
}
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
case object.AttributeFilePath:
|
||||
filepath = val
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
}
|
||||
case attributeMultipartObjectSize:
|
||||
payload, payloadSize, err = h.getPayload(getMultiobjectBodyParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
bktinfo: bktInfo,
|
||||
strSize: val,
|
||||
})
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
req.setDisposition(shouldDownload, filename)
|
||||
|
||||
payload, payloadSize, err := h.getPayload(payloadParams)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
|
||||
contentType := attrs[object.AttributeContentType]
|
||||
if len(contentType) == 0 {
|
||||
// determine the Content-Type from the payload head
|
||||
var payloadHead []byte
|
||||
|
@ -131,15 +154,35 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||
}
|
||||
|
||||
func makeAttributesMap(attrs []object.Attribute) map[string]string {
|
||||
attributes := make(map[string]string)
|
||||
for _, attr := range attrs {
|
||||
if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) {
|
||||
continue
|
||||
}
|
||||
key := utils.BackwardTransformIfSystem(attr.Key())
|
||||
|
||||
attributes[key] = attr.Value()
|
||||
}
|
||||
return attributes
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
)
|
||||
|
||||
dis := inlineDisposition
|
||||
if shouldDownload {
|
||||
dis = attachmentDisposition
|
||||
}
|
||||
|
||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,9 +3,6 @@ package handler
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -13,10 +10,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -38,51 +35,6 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|||
response.Error(r.RequestCtx, msg, statusCode)
|
||||
}
|
||||
|
||||
func (r *request) setAttributes(attrs map[string]string) {
|
||||
for key, val := range attrs {
|
||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, attrs map[string]string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
)
|
||||
|
||||
dis := inlineDisposition
|
||||
if shouldDownload {
|
||||
dis = attachmentDisposition
|
||||
}
|
||||
|
||||
filename := attrs[object.AttributeFileName]
|
||||
filepath := attrs[object.AttributeFilePath]
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
}
|
||||
|
||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||
return tkn
|
||||
|
@ -140,3 +92,10 @@ func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
|||
}
|
||||
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
||||
|
|
|
@ -6,19 +6,28 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// PartInfo is upload information about part.
|
||||
type PartInfo struct {
|
||||
Key string `json:"key"`
|
||||
UploadID string `json:"uploadId"`
|
||||
Number int `json:"number"`
|
||||
OID oid.ID `json:"oid"`
|
||||
Size uint64 `json:"size"`
|
||||
ETag string `json:"etag"`
|
||||
MD5 string `json:"md5"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
type GetFrostFSParams struct {
|
||||
// payload range
|
||||
Off, Ln uint64
|
||||
|
||||
Oid oid.ID
|
||||
BktInfo *data.BucketInfo
|
||||
Addr oid.Address
|
||||
}
|
||||
|
||||
type PartObj struct {
|
||||
|
@ -33,7 +42,6 @@ type readerInitiator interface {
|
|||
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||
type MultiObjectReader struct {
|
||||
ctx context.Context
|
||||
log *zap.Logger
|
||||
|
||||
layer readerInitiator
|
||||
|
||||
|
@ -50,13 +58,12 @@ type MultiObjectReader struct {
|
|||
|
||||
type MultiObjectReaderConfig struct {
|
||||
Initiator readerInitiator
|
||||
Log *zap.Logger
|
||||
|
||||
// the offset of complete object and total size to read
|
||||
Off, Ln uint64
|
||||
|
||||
BktInfo *data.BucketInfo
|
||||
Parts []PartObj
|
||||
Addr oid.Address
|
||||
Parts []PartObj
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -69,13 +76,13 @@ var (
|
|||
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
|
||||
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
|
||||
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
|
||||
Address: p.ObjInfo.Address(),
|
||||
Address: p.Addr,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get combined object '%s': %w", p.ObjInfo.ID.EncodeToString(), err)
|
||||
return nil, fmt.Errorf("get combined object '%s': %w", p.Addr.Object().EncodeToString(), err)
|
||||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
var parts []*PartInfo
|
||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
@ -93,8 +100,7 @@ func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMu
|
|||
Off: p.Off,
|
||||
Ln: p.Ln,
|
||||
Parts: objParts,
|
||||
BktInfo: p.BktInfo,
|
||||
Log: p.Log,
|
||||
Addr: p.Addr,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -107,10 +113,9 @@ func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*Mu
|
|||
ctx: ctx,
|
||||
layer: cfg.Initiator,
|
||||
prm: GetFrostFSParams{
|
||||
BktInfo: cfg.BktInfo,
|
||||
Addr: cfg.Addr,
|
||||
},
|
||||
parts: cfg.Parts,
|
||||
log: cfg.Log,
|
||||
}
|
||||
|
||||
if cfg.Off+cfg.Ln == 0 {
|
||||
|
@ -177,7 +182,7 @@ func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
|||
return n, io.EOF
|
||||
}
|
||||
|
||||
x.prm.Oid = x.parts[x.curIndex].OID
|
||||
x.prm.Addr.SetObject(x.parts[x.curIndex].OID)
|
||||
|
||||
if x.curIndex == 0 {
|
||||
x.prm.Off = x.startPartOffset
|
||||
|
@ -206,15 +211,11 @@ func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
|||
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||
var prmAuth handler.PrmAuth
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(p.BktInfo.CID)
|
||||
addr.SetObject(p.Oid)
|
||||
|
||||
if p.Off+p.Ln != 0 {
|
||||
prm := handler.PrmObjectRange{
|
||||
PrmAuth: prmAuth,
|
||||
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||
Address: addr,
|
||||
Address: p.Addr,
|
||||
}
|
||||
|
||||
return x.RangeObject(ctx, prm)
|
||||
|
@ -222,7 +223,7 @@ func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrost
|
|||
|
||||
prm := handler.PrmObjectGet{
|
||||
PrmAuth: prmAuth,
|
||||
Address: addr,
|
||||
Address: p.Addr,
|
||||
}
|
||||
|
||||
res, err := x.GetObject(ctx, prm)
|
||||
|
|
|
@ -18,7 +18,7 @@ type readerInitiatorMock struct {
|
|||
}
|
||||
|
||||
func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||
partPayload, ok := r.parts[p.Oid]
|
||||
partPayload, ok := r.parts[p.Addr.Object()]
|
||||
if !ok {
|
||||
return nil, errors.New("part not found")
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue