[#417] Complete multipart upload using tree service

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-24 17:55:56 +03:00 committed by Alex Vanin
parent 6b2ddcadd0
commit 70957d75fd
5 changed files with 86 additions and 135 deletions

View file

@ -1,8 +1,6 @@
package handler package handler
import ( import (
"bytes"
"encoding/json"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
"net/url" "net/url"
@ -87,11 +85,6 @@ type (
ETag string `xml:"ETag"` ETag string `xml:"ETag"`
LastModified string `xml:"LastModified"` LastModified string `xml:"LastModified"`
} }
UploadData struct {
TagSet map[string]string
ACL *AccessControlPolicy
}
) )
const ( const (
@ -120,6 +113,7 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
Bkt: bktInfo, Bkt: bktInfo,
Key: reqInfo.ObjectName, Key: reqInfo.ObjectName,
}, },
Data: &layer.UploadData{},
} }
if containsACLHeaders(r) { if containsACLHeaders(r) {
@ -132,11 +126,11 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
h.logAndSendError(w, "could not parse acl", reqInfo, err) h.logAndSendError(w, "could not parse acl", reqInfo, err)
return return
} }
p.ACLHeaders = formACLHeadersForMultipart(r.Header) p.Data.ACLHeaders = formACLHeadersForMultipart(r.Header)
} }
if len(r.Header.Get(api.AmzTagging)) > 0 { if len(r.Header.Get(api.AmzTagging)) > 0 {
p.TagSet, err = parseTaggingHeader(r.Header) p.Data.TagSet, err = parseTaggingHeader(r.Header)
if err != nil { if err != nil {
h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...) h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...)
return return
@ -352,7 +346,6 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
Key: reqInfo.ObjectName, Key: reqInfo.ObjectName,
} }
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)} additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
uploadData = &UploadData{}
) )
reqBody := new(CompleteMultipartUpload) reqBody := new(CompleteMultipartUpload)
@ -366,42 +359,11 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
return return
} }
initPart, err := h.obj.GetUploadInitInfo(r.Context(), uploadInfo)
if err != nil {
h.logAndSendError(w, "could not get multipart upload info", reqInfo, err, additional...)
return
}
if initPart.Size > 0 {
initPartPayload := bytes.NewBuffer(make([]byte, 0, initPart.Size))
p := &layer.GetObjectParams{
ObjectInfo: initPart,
Writer: initPartPayload,
BucketInfo: bktInfo,
}
if err = h.obj.GetObject(r.Context(), p); err != nil {
h.logAndSendError(w, "could not get multipart upload acl and/or tagging", reqInfo, err, additional...)
return
}
if err = json.Unmarshal(initPartPayload.Bytes(), uploadData); err != nil {
h.logAndSendError(w, "could not unmarshal multipart upload acl and/or tagging", reqInfo, err, additional...)
return
}
if uploadData.ACL != nil {
if sessionTokenSetEACL, err = getSessionTokenSetEACL(r.Context()); err != nil {
h.logAndSendError(w, "couldn't get eacl token", reqInfo, err)
return
}
}
}
c := &layer.CompleteMultipartParams{ c := &layer.CompleteMultipartParams{
Info: uploadInfo, Info: uploadInfo,
Parts: reqBody.Parts, Parts: reqBody.Parts,
} }
objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) uploadData, objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
if err != nil { if err != nil {
h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...)
return return
@ -419,12 +381,23 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
} }
} }
if uploadData.ACL != nil { if len(uploadData.ACLHeaders) != 0 {
key, err := h.bearerTokenIssuerKey(r.Context())
if err != nil {
h.logAndSendError(w, "couldn't get gate key", reqInfo, err)
return
}
acl, err := parseACLHeaders(r.Header, key)
if err != nil {
h.logAndSendError(w, "could not parse acl", reqInfo, err)
return
}
resInfo := &resourceInfo{ resInfo := &resourceInfo{
Bucket: objInfo.Bucket, Bucket: objInfo.Bucket,
Object: objInfo.Name, Object: objInfo.Name,
} }
astObject, err := aclToAst(uploadData.ACL, resInfo) astObject, err := aclToAst(acl, resInfo)
if err != nil { if err != nil {
h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...)
return return

View file

@ -232,13 +232,12 @@ type (
DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error)
UploadPart(ctx context.Context, p *UploadPartParams) (string, error) UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error
ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error)
GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error)
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)

View file

@ -11,7 +11,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/internal/misc" "github.com/nspcc-dev/neofs-s3-gw/internal/misc"
@ -46,8 +45,12 @@ type (
} }
CreateMultipartParams struct { CreateMultipartParams struct {
Info *UploadInfoParams Info *UploadInfoParams
Header map[string]string Header map[string]string
Data *UploadData
}
UploadData struct {
TagSet map[string]string TagSet map[string]string
ACLHeaders map[string]string ACLHeaders map[string]string
} }
@ -124,24 +127,32 @@ type (
) )
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
metaSize := len(p.Header)
if p.Data != nil {
metaSize += len(p.Data.ACLHeaders)
metaSize += len(p.Data.TagSet)
}
info := &data.MultipartInfo{ info := &data.MultipartInfo{
Key: p.Info.Key, Key: p.Info.Key,
UploadID: p.Info.UploadID, UploadID: p.Info.UploadID,
Owner: n.Owner(ctx), Owner: n.Owner(ctx),
Created: time.Now(), Created: time.Now(),
Meta: make(map[string]string, len(p.Header)+len(p.ACLHeaders)+len(p.TagSet)), Meta: make(map[string]string, metaSize),
} }
for key, val := range p.Header { for key, val := range p.Header {
info.Meta[metaPrefix+key] = val info.Meta[metaPrefix+key] = val
} }
for key, val := range p.ACLHeaders { if p.Data != nil {
info.Meta[aclPrefix+key] = val for key, val := range p.Data.ACLHeaders {
} info.Meta[aclPrefix+key] = val
}
for key, val := range p.TagSet { for key, val := range p.Data.TagSet {
info.Meta[tagPrefix+key] = val info.Meta[tagPrefix+key] = val
}
} }
return n.treeService.CreateMultipartUpload(ctx, &p.Info.Bkt.CID, info) return n.treeService.CreateMultipartUpload(ctx, &p.Info.Bkt.CID, info)
@ -310,51 +321,20 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) {
return n + next, err return n + next, err
} }
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) { func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error) {
var (
obj *data.ObjectInfo
partsAttrValue string
)
for i := 1; i < len(p.Parts); i++ { for i := 1; i < len(p.Parts); i++ {
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber { if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
return nil, errors.GetAPIError(errors.ErrInvalidPartOrder) return nil, nil, errors.GetAPIError(errors.ErrInvalidPartOrder)
} }
} }
_, objects, err := n.getUploadParts(ctx, p.Info) multipartInfo, objects, err := n.getUploadParts(ctx, p.Info) // todo consider avoid heading objects
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if len(objects) == 1 { if len(objects) < len(p.Parts) {
obj, err = n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key) return nil, nil, errors.GetAPIError(errors.ErrInvalidPart)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, errors.GetAPIError(errors.ErrInvalidPart)
}
return nil, err
}
if obj != nil && obj.Headers[UploadIDAttributeName] == p.Info.UploadID {
return obj, nil
}
return nil, errors.GetAPIError(errors.ErrInvalidPart)
}
if _, ok := objects[0]; !ok {
n.log.Error("could not get init multipart upload",
zap.Stringer("bucket id", p.Info.Bkt.CID),
zap.String("uploadID", misc.SanitizeString(p.Info.UploadID)),
zap.String("uploadKey", p.Info.Key),
)
// we return InternalError because if we are here it means we've checked InitPart in handler before and
// received successful result, it's strange we didn't get the InitPart again
return nil, errors.GetAPIError(errors.ErrInternalError)
}
// keep in mind objects[0] is the init part
if len(objects) <= len(p.Parts) {
return nil, errors.GetAPIError(errors.ErrInvalidPart)
} }
parts := make([]*data.ObjectInfo, 0, len(p.Parts)) parts := make([]*data.ObjectInfo, 0, len(p.Parts))
@ -362,31 +342,29 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
for i, part := range p.Parts { for i, part := range p.Parts {
info := objects[part.PartNumber] info := objects[part.PartNumber]
if info == nil || part.ETag != info.HashSum { if info == nil || part.ETag != info.HashSum {
return nil, errors.GetAPIError(errors.ErrInvalidPart) return nil, nil, errors.GetAPIError(errors.ErrInvalidPart)
} }
// for the last part we have no minimum size limit // for the last part we have no minimum size limit
if i != len(p.Parts)-1 && info.Size < uploadMinSize { if i != len(p.Parts)-1 && info.Size < uploadMinSize {
return nil, errors.GetAPIError(errors.ErrEntityTooSmall) return nil, nil, errors.GetAPIError(errors.ErrEntityTooSmall)
} }
parts = append(parts, info) parts = append(parts, info)
partsAttrValue += strconv.Itoa(part.PartNumber) + "=" + strconv.FormatInt(info.Size, 10) + ","
} }
initMetadata := objects[0].Headers initMetadata := make(map[string]string, len(multipartInfo.Meta))
if len(objects[0].ContentType) != 0 { uploadData := &UploadData{
initMetadata[api.ContentType] = objects[0].ContentType TagSet: make(map[string]string),
ACLHeaders: make(map[string]string),
}
for key, val := range multipartInfo.Meta {
if strings.HasPrefix(key, metaPrefix) {
initMetadata[strings.TrimPrefix(key, metaPrefix)] = val
} else if strings.HasPrefix(key, tagPrefix) {
uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val
} else if strings.HasPrefix(key, aclPrefix) {
uploadData.ACLHeaders[strings.TrimPrefix(key, aclPrefix)] = val
}
} }
/* We will keep "S3-Upload-Id" attribute in a completed object to determine if it is a "common" object or a completed object.
We will need to differ these objects if something goes wrong during completing multipart upload.
I.e. we had completed the object but didn't put tagging/acl for some reason */
delete(initMetadata, UploadPartNumberAttributeName)
delete(initMetadata, UploadKeyAttributeName)
delete(initMetadata, attrVersionsIgnore)
delete(initMetadata, objectSystemAttributeName)
delete(initMetadata, versionsUnversionedAttr)
initMetadata[UploadCompletedParts] = partsAttrValue[:len(partsAttrValue)-1]
r := &multiObjectReader{ r := &multiObjectReader{
ctx: ctx, ctx: ctx,
@ -396,7 +374,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
r.prm.bktInfo = p.Info.Bkt r.prm.bktInfo = p.Info.Bkt
obj, err = n.PutObject(ctx, &PutObjectParams{ obj, err := n.PutObject(ctx, &PutObjectParams{
BktInfo: p.Info.Bkt, BktInfo: p.Info.Bkt,
Object: p.Info.Key, Object: p.Info.Key,
Reader: r, Reader: r,
@ -408,9 +386,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
zap.String("uploadKey", p.Info.Key), zap.String("uploadKey", p.Info.Key),
zap.Error(err)) zap.Error(err))
return nil, errors.GetAPIError(errors.ErrInternalError) return nil, nil, errors.GetAPIError(errors.ErrInternalError)
} }
var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID)
for partNum, objInfo := range objects { for partNum, objInfo := range objects {
if partNum == 0 { if partNum == 0 {
continue continue
@ -421,10 +401,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
zap.Stringer("bucket id", p.Info.Bkt.CID), zap.Stringer("bucket id", p.Info.Bkt.CID),
zap.Error(err)) zap.Error(err))
} }
n.systemCache.Delete(systemObjectKey(p.Info.Bkt, FormUploadPartName(p.Info.UploadID, p.Info.Key, partNum))) addr.SetObject(objInfo.ID)
n.objCache.Delete(addr)
} }
return obj, nil return uploadData, obj, n.treeService.DeleteMultipartUpload(ctx, &p.Info.Bkt.CID, multipartInfo.ID)
} }
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) { func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
@ -549,18 +530,6 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil return &res, nil
} }
func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) {
info, err := n.HeadSystemObject(ctx, p.Bkt, FormUploadPartName(p.UploadID, p.Key, 0))
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
return nil, err
}
return info, nil
}
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) { func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID) multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID)
if err != nil { if err != nil {

View file

@ -237,7 +237,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID)
return &data.ObjectInfo{ objInfo := &data.ObjectInfo{
ID: *id, ID: *id,
CID: p.BktInfo.CID, CID: p.BktInfo.CID,
@ -250,7 +250,20 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
Headers: p.Header, Headers: p.Header,
ContentType: p.Header[api.ContentType], ContentType: p.Header[api.ContentType],
HashSum: hex.EncodeToString(hash), HashSum: hex.EncodeToString(hash),
}, nil }
if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Warn("couldn't add object to cache", zap.Error(err),
zap.String("object_name", p.Object), zap.String("bucket_name", p.BktInfo.Name),
zap.String("cid", objInfo.CID.EncodeToString()), zap.String("oid", objInfo.ID.EncodeToString()))
}
if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil {
n.log.Warn("couldn't put obj address to name cache",
zap.String("obj nice name", objInfo.NiceName()),
zap.Error(err))
}
return objInfo, nil
} }
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, lock *data.ObjectLock) error { func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, lock *data.ObjectLock) error {

View file

@ -689,9 +689,6 @@ func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID
} }
nodes, err := c.getNodes(ctx, p) nodes, err := c.getNodes(ctx, p)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, layer.ErrNodeNotFound
}
return nil, fmt.Errorf("couldn't get nodes: %w", err) return nil, fmt.Errorf("couldn't get nodes: %w", err)
} }
@ -984,7 +981,7 @@ func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID strin
cli, err := c.service.GetSubTree(ctx, request) cli, err := c.service.GetSubTree(ctx, request)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return nil, nil return nil, layer.ErrNodeNotFound
} }
return nil, fmt.Errorf("failed to get sub tree client: %w", err) return nil, fmt.Errorf("failed to get sub tree client: %w", err)
} }
@ -996,7 +993,7 @@ func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID strin
break break
} else if err != nil { } else if err != nil {
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return nil, nil return nil, layer.ErrNodeNotFound
} }
return nil, fmt.Errorf("failed to get sub tree: %w", err) return nil, fmt.Errorf("failed to get sub tree: %w", err)
} }
@ -1044,9 +1041,6 @@ func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID string,
} }
nodes, err := c.getNodes(ctx, p) nodes, err := c.getNodes(ctx, p)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, layer.ErrNodeNotFound
}
return nil, fmt.Errorf("couldn't get nodes: %w", err) return nil, fmt.Errorf("couldn't get nodes: %w", err)
} }
if len(nodes) == 0 { if len(nodes) == 0 {
@ -1084,6 +1078,9 @@ func (c *TreeClient) getNodes(ctx context.Context, p *getNodesParams) ([]*tree.G
resp, err := c.service.GetNodeByPath(ctx, request) resp, err := c.service.GetNodeByPath(ctx, request)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, layer.ErrNodeNotFound
}
return nil, fmt.Errorf("failed to get node path: %w", err) return nil, fmt.Errorf("failed to get node path: %w", err)
} }