[#400] Make multipart-upload parts system objects

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2022-04-18 16:16:01 +04:00 committed by Alex Vanin
parent 9f017b2bba
commit f274747e83
2 changed files with 56 additions and 46 deletions

View file

@ -441,12 +441,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
}
p := &layer.DeleteObjectParams{
BktInfo: bktInfo,
BktSettings: bktSettings,
Objects: []*layer.VersionedObject{{Name: initPart.Name}},
}
if _, err = h.obj.DeleteObjects(r.Context(), p); err != nil {
if err = h.obj.DeleteSystemObject(r.Context(), bktInfo, layer.FormUploadPartName(uploadID, uploadInfo.Key, 0)); err != nil {
h.logAndSendError(w, "could not delete init file of multipart upload", reqInfo, err, additional...)
return
}

View file

@ -127,15 +127,15 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje
appendUploadHeaders(p.Header, p.Info.UploadID, p.Info.Key, p.PartNumber)
params := &PutObjectParams{
BktInfo: p.Info.Bkt,
Object: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Size: p.Size,
Reader: p.Reader,
Header: p.Header,
params := &PutSystemObjectParams{
BktInfo: p.Info.Bkt,
ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Metadata: p.Header,
Prefix: "",
Reader: p.Reader,
}
return n.PutObject(ctx, params)
return n.PutSystemObject(ctx, params)
}
func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
@ -159,16 +159,27 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
metadata := make(map[string]string)
appendUploadHeaders(metadata, p.Info.UploadID, p.Info.Key, p.PartNumber)
c := &CopyObjectParams{
SrcObject: p.SrcObjInfo,
DstBktInfo: p.Info.Bkt,
DstObject: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
SrcSize: p.SrcObjInfo.Size,
Header: metadata,
Range: p.Range,
}
pr, pw := io.Pipe()
return n.CopyObject(ctx, c)
go func() {
err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Writer: pw,
Range: p.Range,
})
if err = pw.CloseWithError(err); err != nil {
n.log.Error("could not get object", zap.Error(err))
}
}()
return n.PutSystemObject(ctx, &PutSystemObjectParams{
BktInfo: p.Info.Bkt,
ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Metadata: metadata,
Prefix: "",
Reader: pr,
})
}
// implements io.Reader of payloads of the object list stored in the NeoFS network.
@ -225,14 +236,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
}
if len(objects) == 1 {
obj, err := n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key)
obj, err = n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, errors.GetAPIError(errors.ErrInvalidPart)
}
return nil, err
}
if obj != nil && obj.Headers[UploadIDAttributeName] != "" {
if obj != nil && obj.Headers[UploadIDAttributeName] == p.Info.UploadID {
return obj, nil
}
return nil, errors.GetAPIError(errors.ErrInvalidPart)
@ -249,7 +260,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
return nil, errors.GetAPIError(errors.ErrInternalError)
}
if len(objects) < len(p.Parts) {
// keep in mind objects[0] is the init part
if len(objects) <= len(p.Parts) {
return nil, errors.GetAPIError(errors.ErrInvalidPart)
}
@ -278,6 +290,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
delete(initMetadata, UploadPartNumberAttributeName)
delete(initMetadata, UploadKeyAttributeName)
delete(initMetadata, attrVersionsIgnore)
delete(initMetadata, objectSystemAttributeName)
delete(initMetadata, versionsUnversionedAttr)
r := &multiObjectReader{
ctx: ctx,
@ -311,8 +325,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
zap.Stringer("object id", objInfo.ID),
zap.Stringer("bucket id", p.Info.Bkt.CID),
zap.Error(err))
return nil, errors.GetAPIError(errors.ErrInternalError)
}
n.systemCache.Delete(systemObjectKey(p.Info.Bkt, FormUploadPartName(p.Info.UploadID, p.Info.Key, partNum)))
}
return obj, nil
@ -455,38 +469,29 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
}
func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) {
ids, err := n.objectSearchByName(ctx, p.Bkt.CID, createUploadPartName(p.UploadID, p.Key, 0))
if err != nil {
return nil, err
}
if len(ids) == 0 {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
if len(ids) > 1 {
return nil, errors.GetAPIError(errors.ErrInternalError)
}
meta, err := n.objectHead(ctx, p.Bkt.CID, &ids[0])
info, err := n.HeadSystemObject(ctx, p.Bkt, FormUploadPartName(p.UploadID, p.Key, 0))
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
return nil, err
}
return objInfoFromMeta(p.Bkt, meta), nil
return info, nil
}
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[int]*data.ObjectInfo, error) {
// we search by UploadID attribute because parts are system objects which have system name not filename
// and search in attributes by prefix is not supported
f := &findParams{
cid: p.Bkt.CID,
prefix: UploadPartKeyPrefix + p.UploadID + "-" + p.Key,
attr: [2]string{UploadIDAttributeName, p.UploadID},
cid: p.Bkt.CID,
}
ids, err := n.objectSearch(ctx, f)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
res := make(map[int]*data.ObjectInfo)
@ -500,18 +505,29 @@ func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[in
continue
}
info := objInfoFromMeta(p.Bkt, meta)
// skip objects which are completed by "complete-multipart-upload" because they have "s3-Upload-Id" attribute
if !isSystem(info) {
continue
}
numStr := info.Headers[UploadPartNumberAttributeName]
num, err := strconv.Atoi(numStr)
if err != nil {
return nil, errors.GetAPIError(errors.ErrInternalError)
}
res[num] = info
if err = n.systemCache.PutObject(systemObjectKey(p.Bkt, FormUploadPartName(p.UploadID, p.Key, num)), info); err != nil {
n.log.Warn("couldn't cache upload part", zap.Error(err))
}
}
if len(res) == 0 {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
return res, nil
}
func createUploadPartName(uploadID, key string, partNumber int) string {
func FormUploadPartName(uploadID, key string, partNumber int) string {
return UploadPartKeyPrefix + uploadID + "-" + key + "-" + strconv.Itoa(partNumber)
}
@ -585,5 +601,4 @@ func appendUploadHeaders(metadata map[string]string, uploadID, key string, partN
metadata[UploadIDAttributeName] = uploadID
metadata[UploadPartNumberAttributeName] = strconv.Itoa(partNumber)
metadata[UploadKeyAttributeName] = key
metadata[attrVersionsIgnore] = "true"
}