diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 3f36fc93..089f4f04 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -441,12 +441,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. h.logAndSendError(w, "could not get bucket settings", reqInfo, err) } - p := &layer.DeleteObjectParams{ - BktInfo: bktInfo, - BktSettings: bktSettings, - Objects: []*layer.VersionedObject{{Name: initPart.Name}}, - } - if _, err = h.obj.DeleteObjects(r.Context(), p); err != nil { + if err = h.obj.DeleteSystemObject(r.Context(), bktInfo, layer.FormUploadPartName(uploadID, uploadInfo.Key, 0)); err != nil { h.logAndSendError(w, "could not delete init file of multipart upload", reqInfo, err, additional...) return } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 52e57c28..1123db14 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -127,15 +127,15 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje appendUploadHeaders(p.Header, p.Info.UploadID, p.Info.Key, p.PartNumber) - params := &PutObjectParams{ - BktInfo: p.Info.Bkt, - Object: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), - Size: p.Size, - Reader: p.Reader, - Header: p.Header, + params := &PutSystemObjectParams{ + BktInfo: p.Info.Bkt, + ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), + Metadata: p.Header, + Prefix: "", + Reader: p.Reader, } - return n.PutObject(ctx, params) + return n.PutSystemObject(ctx, params) } func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { @@ -159,16 +159,27 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. metadata := make(map[string]string) appendUploadHeaders(metadata, p.Info.UploadID, p.Info.Key, p.PartNumber) - c := &CopyObjectParams{ - SrcObject: p.SrcObjInfo, - DstBktInfo: p.Info.Bkt, - DstObject: createUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), - SrcSize: p.SrcObjInfo.Size, - Header: metadata, - Range: p.Range, - } + pr, pw := io.Pipe() - return n.CopyObject(ctx, c) + go func() { + err := n.GetObject(ctx, &GetObjectParams{ + ObjectInfo: p.SrcObjInfo, + Writer: pw, + Range: p.Range, + }) + + if err = pw.CloseWithError(err); err != nil { + n.log.Error("could not get object", zap.Error(err)) + } + }() + + return n.PutSystemObject(ctx, &PutSystemObjectParams{ + BktInfo: p.Info.Bkt, + ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), + Metadata: metadata, + Prefix: "", + Reader: pr, + }) } // implements io.Reader of payloads of the object list stored in the NeoFS network. @@ -225,14 +236,14 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar } if len(objects) == 1 { - obj, err := n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key) + obj, err = n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) { return nil, errors.GetAPIError(errors.ErrInvalidPart) } return nil, err } - if obj != nil && obj.Headers[UploadIDAttributeName] != "" { + if obj != nil && obj.Headers[UploadIDAttributeName] == p.Info.UploadID { return obj, nil } return nil, errors.GetAPIError(errors.ErrInvalidPart) @@ -249,7 +260,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar return nil, errors.GetAPIError(errors.ErrInternalError) } - if len(objects) < len(p.Parts) { + // keep in mind objects[0] is the init part + if len(objects) <= len(p.Parts) { return nil, errors.GetAPIError(errors.ErrInvalidPart) } @@ -278,6 +290,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar delete(initMetadata, UploadPartNumberAttributeName) delete(initMetadata, UploadKeyAttributeName) delete(initMetadata, attrVersionsIgnore) + delete(initMetadata, objectSystemAttributeName) + delete(initMetadata, versionsUnversionedAttr) r := &multiObjectReader{ ctx: ctx, @@ -311,8 +325,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar zap.Stringer("object id", objInfo.ID), zap.Stringer("bucket id", p.Info.Bkt.CID), zap.Error(err)) - return nil, errors.GetAPIError(errors.ErrInternalError) } + n.systemCache.Delete(systemObjectKey(p.Info.Bkt, FormUploadPartName(p.Info.UploadID, p.Info.Key, partNum))) } return obj, nil @@ -455,38 +469,29 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn } func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) { - ids, err := n.objectSearchByName(ctx, p.Bkt.CID, createUploadPartName(p.UploadID, p.Key, 0)) - if err != nil { - return nil, err - } - if len(ids) == 0 { - return nil, errors.GetAPIError(errors.ErrNoSuchUpload) - } - if len(ids) > 1 { - return nil, errors.GetAPIError(errors.ErrInternalError) - } - - meta, err := n.objectHead(ctx, p.Bkt.CID, &ids[0]) + info, err := n.HeadSystemObject(ctx, p.Bkt, FormUploadPartName(p.UploadID, p.Key, 0)) if err != nil { + if errors.IsS3Error(err, errors.ErrNoSuchKey) { + return nil, errors.GetAPIError(errors.ErrNoSuchUpload) + } return nil, err } - return objInfoFromMeta(p.Bkt, meta), nil + return info, nil } func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[int]*data.ObjectInfo, error) { + // we search by UploadID attribute because parts are system objects which have system name not filename + // and search in attributes by prefix is not supported f := &findParams{ - cid: p.Bkt.CID, - prefix: UploadPartKeyPrefix + p.UploadID + "-" + p.Key, + attr: [2]string{UploadIDAttributeName, p.UploadID}, + cid: p.Bkt.CID, } ids, err := n.objectSearch(ctx, f) if err != nil { return nil, err } - if len(ids) == 0 { - return nil, errors.GetAPIError(errors.ErrNoSuchUpload) - } res := make(map[int]*data.ObjectInfo) @@ -500,18 +505,29 @@ func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[in continue } info := objInfoFromMeta(p.Bkt, meta) + // skip objects which are completed by "complete-multipart-upload" because they have "s3-Upload-Id" attribute + if !isSystem(info) { + continue + } numStr := info.Headers[UploadPartNumberAttributeName] num, err := strconv.Atoi(numStr) if err != nil { return nil, errors.GetAPIError(errors.ErrInternalError) } res[num] = info + if err = n.systemCache.PutObject(systemObjectKey(p.Bkt, FormUploadPartName(p.UploadID, p.Key, num)), info); err != nil { + n.log.Warn("couldn't cache upload part", zap.Error(err)) + } + } + + if len(res) == 0 { + return nil, errors.GetAPIError(errors.ErrNoSuchUpload) } return res, nil } -func createUploadPartName(uploadID, key string, partNumber int) string { +func FormUploadPartName(uploadID, key string, partNumber int) string { return UploadPartKeyPrefix + uploadID + "-" + key + "-" + strconv.Itoa(partNumber) } @@ -585,5 +601,4 @@ func appendUploadHeaders(metadata map[string]string, uploadID, key string, partN metadata[UploadIDAttributeName] = uploadID metadata[UploadPartNumberAttributeName] = strconv.Itoa(partNumber) metadata[UploadKeyAttributeName] = key - metadata[attrVersionsIgnore] = "true" }