frostfs-http-gw/internal/handler/multipart.go

200 lines
5.1 KiB
Go
Raw Normal View History

package handler
import (
"context"
"encoding/json"
"fmt"
"io"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/minio/sio"
"go.uber.org/zap"
)
const (
FrostFSSystemMetadataPrefix = "S3-"
AttributeEncryptionAlgorithm = FrostFSSystemMetadataPrefix + "Algorithm"
AttributeHMACSalt = FrostFSSystemMetadataPrefix + "HMAC-Salt"
AttributeHMACKey = FrostFSSystemMetadataPrefix + "HMAC-Key"
AttributeMultipartObjectSize = FrostFSSystemMetadataPrefix + "Multipart-Object-Size"
)
// MultipartFile provides standard ReadCloser interface and also allows one to
// get file name, it's used for multipart uploads.
type MultipartFile interface {
io.ReadCloser
FileName() string
}
func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartFile, error) {
// To have a custom buffer (3mb) the custom multipart reader is used.
// Default reader uses 4KiB chunks, which slow down upload speed up to 400%
// https://github.com/golang/go/blob/91b9915d3f6f8cd2e9e9fda63f67772803adfa03/src/mime/multipart/multipart.go#L32
reader := multipart.NewReader(r, boundary)
for {
part, err := reader.NextPart()
if err != nil {
return nil, err
}
name := part.FormName()
if name == "" {
l.Debug(logs.IgnorePartEmptyFormName)
continue
}
filename := part.FileName()
// ignore multipart/form-data values
if filename == "" {
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
continue
}
return part, nil
}
}
type getParams struct {
// payload range
off, ln uint64
objInfo *data.ObjectInfo
bktInfo *data.BucketInfo
}
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
sizeValue, ok := p.attrs[AttributeMultipartObjectSize]
if !ok {
return p.obj.Payload, p.obj.Header.PayloadSize(), nil
}
cid, _ := p.obj.Header.ContainerID()
oid, _ := p.obj.Header.ID()
size, err := strconv.ParseUint(sizeValue, 10, 64)
if err != nil {
return nil, 0, err
}
ctx := p.req.RequestCtx
params := getParams{
off: 0,
ln: 0,
objInfo: &data.ObjectInfo{
ID: oid,
CID: cid,
Bucket: p.bktinfo.Name,
Name: p.attrs["FilePath"],
Size: size,
Headers: p.attrs,
},
bktInfo: p.bktinfo,
}
payload, err := h.initMultipartReader(ctx, params)
if err != nil {
return nil, 0, err
}
return io.NopCloser(payload), size, nil
}
func (h *Handler) initMultipartReader(ctx context.Context, p getParams) (io.Reader, error) {
combinedObj, err := h.frostfs.GetObject(ctx, frostfs.PrmObjectGet{
PrmAuth: frostfs.PrmAuth{BearerToken: bearerToken(ctx)},
Address: p.objInfo.Address(),
})
if err != nil {
return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
}
isEncrypted := FormEncryptionInfo(p.objInfo.Headers).Enabled
objParts := make([]frostfs.PartObj, len(parts))
for i, part := range parts {
size := part.Size
if isEncrypted {
if size, err = sio.EncryptedSize(part.Size); err != nil {
return nil, fmt.Errorf("compute encrypted size: %w", err)
}
}
objParts[i] = frostfs.PartObj{
OID: part.OID,
Size: size,
}
}
return frostfs.NewMultiObjectReader(ctx, frostfs.MultiObjectReaderConfig{
Handler: h,
Off: p.off,
Ln: p.ln,
Parts: objParts,
BktInfo: p.bktInfo,
Log: h.log,
})
}
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
// Zero range corresponds to full payload (panics if only offset is set).
func (h *Handler) InitFrostFSObjectPayloadReader(ctx context.Context, p frostfs.GetFrostFSParams) (io.Reader, error) {
var prmAuth frostfs.PrmAuth
var addr oid.Address
addr.SetContainer(p.BktInfo.CID)
addr.SetObject(p.Oid)
if p.Off+p.Ln != 0 {
prm := frostfs.PrmObjectRange{
PrmAuth: prmAuth,
Container: p.BktInfo.CID,
Object: p.Oid,
PayloadRange: [2]uint64{p.Off, p.Ln},
Address: addr,
}
return h.frostfs.RangeObject(ctx, prm)
}
prm := frostfs.PrmObjectGet{
PrmAuth: prmAuth,
Container: p.BktInfo.CID,
Object: p.Oid,
Address: addr,
}
res, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Payload, nil
}
// ObjectEncryption stores parsed object encryption headers.
type ObjectEncryption struct {
Enabled bool
Algorithm string
HMACKey string
HMACSalt string
}
func FormEncryptionInfo(headers map[string]string) ObjectEncryption {
algorithm := headers[AttributeEncryptionAlgorithm]
return ObjectEncryption{
Enabled: len(algorithm) > 0,
Algorithm: algorithm,
HMACKey: headers[AttributeHMACKey],
HMACSalt: headers[AttributeHMACSalt],
}
}