forked from TrueCloudLab/frostfs-s3-gw
[#669] Optimize getNodeVersion for tags operation
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
e42fbbbe9f
commit
821df3d648
11 changed files with 174 additions and 116 deletions
|
@ -68,12 +68,12 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.HeadObjectParams{
|
||||
srcObjPrm := &layer.HeadObjectParams{
|
||||
Object: srcObject,
|
||||
VersionID: versionID,
|
||||
}
|
||||
|
||||
if p.BktInfo, err = h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner); err != nil {
|
||||
if srcObjPrm.BktInfo, err = h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner); err != nil {
|
||||
h.logAndSendError(w, "couldn't get source bucket", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -97,11 +97,12 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), p)
|
||||
extendedSrcObjInfo, err := h.obj.GetExtendedObjectInfo(r.Context(), srcObjPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not find object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
srcObjInfo := extendedSrcObjInfo.ObjectInfo
|
||||
|
||||
args, err := parseCopyObjectArgs(r.Header)
|
||||
if err != nil {
|
||||
|
@ -125,13 +126,16 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
} else {
|
||||
objVersion := &layer.ObjectVersion{
|
||||
BktInfo: p.BktInfo,
|
||||
tagPrm := &layer.GetObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: srcObjPrm.BktInfo,
|
||||
ObjectName: srcObject,
|
||||
VersionID: objInfo.VersionID(),
|
||||
VersionID: srcObjInfo.VersionID(),
|
||||
},
|
||||
NodeVersion: extendedSrcObjInfo.NodeVersion,
|
||||
}
|
||||
|
||||
_, tagSet, err = h.obj.GetObjectTagging(r.Context(), objVersion)
|
||||
_, tagSet, err = h.obj.GetObjectTagging(r.Context(), tagPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
|
||||
return
|
||||
|
@ -144,21 +148,21 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = encryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(objInfo.Headers)); err != nil {
|
||||
if err = encryptionParams.MatchObjectEncryption(layer.FormEncryptionInfo(srcObjInfo.Headers)); err != nil {
|
||||
h.logAndSendError(w, "encryption doesn't match object", reqInfo, errors.GetAPIError(errors.ErrBadRequest), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if err = checkPreconditions(objInfo, args.Conditional); err != nil {
|
||||
if err = checkPreconditions(srcObjInfo, args.Conditional); err != nil {
|
||||
h.logAndSendError(w, "precondition failed", reqInfo, errors.GetAPIError(errors.ErrPreconditionFailed))
|
||||
return
|
||||
}
|
||||
|
||||
if metadata == nil {
|
||||
if len(objInfo.ContentType) > 0 {
|
||||
objInfo.Headers[api.ContentType] = objInfo.ContentType
|
||||
if len(srcObjInfo.ContentType) > 0 {
|
||||
srcObjInfo.Headers[api.ContentType] = srcObjInfo.ContentType
|
||||
}
|
||||
metadata = objInfo.Headers
|
||||
metadata = srcObjInfo.Headers
|
||||
} else if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
|
||||
metadata[api.ContentType] = contentType
|
||||
}
|
||||
|
@ -170,11 +174,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
params := &layer.CopyObjectParams{
|
||||
SrcObject: objInfo,
|
||||
ScrBktInfo: p.BktInfo,
|
||||
SrcObject: srcObjInfo,
|
||||
ScrBktInfo: srcObjPrm.BktInfo,
|
||||
DstBktInfo: dstBktInfo,
|
||||
DstObject: reqInfo.ObjectName,
|
||||
SrcSize: objInfo.Size,
|
||||
SrcSize: srcObjInfo.Size,
|
||||
Header: metadata,
|
||||
Encryption: encryptionParams,
|
||||
CopiesNuber: copiesNumber,
|
||||
|
@ -187,16 +191,20 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
additional := []zap.Field{zap.String("src_bucket_name", srcBucket), zap.String("src_object_name", srcObject)}
|
||||
if objInfo, err = h.obj.CopyObject(r.Context(), params); err != nil {
|
||||
extendedDstObjInfo, err := h.obj.CopyObject(r.Context(), params)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't copy object", reqInfo, err, additional...)
|
||||
return
|
||||
} else if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: objInfo.Created.UTC().Format(time.RFC3339), ETag: objInfo.HashSum}); err != nil {
|
||||
}
|
||||
dstObjInfo := extendedDstObjInfo.ObjectInfo
|
||||
|
||||
if err = api.EncodeToResponse(w, &CopyObjectResponse{LastModified: dstObjInfo.Created.UTC().Format(time.RFC3339), ETag: dstObjInfo.HashSum}); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
||||
if containsACL {
|
||||
newEaclTable, err := h.getNewEAclTable(r, dstBktInfo, objInfo)
|
||||
newEaclTable, err := h.getNewEAclTable(r, dstBktInfo, dstObjInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
|
||||
return
|
||||
|
@ -215,25 +223,29 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if tagSet != nil {
|
||||
t := &layer.ObjectVersion{
|
||||
tagPrm := &layer.PutObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: dstBktInfo,
|
||||
ObjectName: reqInfo.ObjectName,
|
||||
VersionID: objInfo.VersionID(),
|
||||
VersionID: dstObjInfo.VersionID(),
|
||||
},
|
||||
TagSet: tagSet,
|
||||
NodeVersion: extendedDstObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
h.log.Info("object is copied",
|
||||
zap.String("bucket", objInfo.Bucket),
|
||||
zap.String("object", objInfo.Name),
|
||||
zap.Stringer("object_id", objInfo.ID))
|
||||
zap.String("bucket", dstObjInfo.Bucket),
|
||||
zap.String("object", dstObjInfo.Name),
|
||||
zap.Stringer("object_id", dstObjInfo.ID))
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedCopy,
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo),
|
||||
NotificationInfo: data.NotificationInfoFromObject(dstObjInfo),
|
||||
BktInfo: dstBktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
|
|
|
@ -141,7 +141,7 @@ func createTestObject(hc *handlerContext, bktInfo *data.BucketInfo, objName stri
|
|||
object.AttributeTimestamp: strconv.FormatInt(time.Now().UTC().Unix(), 10),
|
||||
}
|
||||
|
||||
objInfo, err := hc.Layer().PutObject(hc.Context(), &layer.PutObjectParams{
|
||||
extObjInfo, err := hc.Layer().PutObject(hc.Context(), &layer.PutObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
Object: objName,
|
||||
Size: int64(len(content)),
|
||||
|
@ -150,7 +150,7 @@ func createTestObject(hc *handlerContext, bktInfo *data.BucketInfo, objName stri
|
|||
})
|
||||
require.NoError(hc.t, err)
|
||||
|
||||
return objInfo
|
||||
return extObjInfo.ObjectInfo
|
||||
}
|
||||
|
||||
func prepareTestRequest(hc *handlerContext, bktName, objName string, body interface{}) (*httptest.ResponseRecorder, *http.Request) {
|
||||
|
|
|
@ -399,19 +399,24 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
Parts: reqBody.Parts,
|
||||
}
|
||||
|
||||
uploadData, objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
|
||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
objInfo := extendedObjInfo.ObjectInfo
|
||||
|
||||
if len(uploadData.TagSet) != 0 {
|
||||
t := &layer.ObjectVersion{
|
||||
tagPrm := &layer.PutObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: objInfo.Name,
|
||||
VersionID: objInfo.VersionID(),
|
||||
},
|
||||
TagSet: uploadData.TagSet,
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), t, uploadData.TagSet); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not put tagging file of completed multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
h.logAndSendError(w, "could not get bucket objInfo", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -246,17 +246,18 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
info, err := h.obj.PutObject(r.Context(), params)
|
||||
extendedObjInfo, err := h.obj.PutObject(r.Context(), params)
|
||||
if err != nil {
|
||||
_, err2 := io.Copy(io.Discard, r.Body)
|
||||
err3 := r.Body.Close()
|
||||
h.logAndSendError(w, "could not upload object", reqInfo, err, zap.Errors("body close errors", []error{err2, err3}))
|
||||
return
|
||||
}
|
||||
objInfo := extendedObjInfo.ObjectInfo
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedPut,
|
||||
NotificationInfo: data.NotificationInfoFromObject(info),
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
|
@ -265,19 +266,23 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if containsACL {
|
||||
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, info); err != nil {
|
||||
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil {
|
||||
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t := &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: info.Name,
|
||||
VersionID: info.VersionID(),
|
||||
}
|
||||
if tagSet != nil {
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
|
||||
tagPrm := &layer.PutObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: objInfo.Name,
|
||||
VersionID: objInfo.VersionID(),
|
||||
},
|
||||
TagSet: tagSet,
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -297,13 +302,13 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if settings.VersioningEnabled() {
|
||||
w.Header().Set(api.AmzVersionID, info.VersionID())
|
||||
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
|
||||
}
|
||||
if encryption.Enabled() {
|
||||
addSSECHeaders(w.Header(), r.Header)
|
||||
}
|
||||
|
||||
w.Header().Set(api.ETag, info.HashSum)
|
||||
w.Header().Set(api.ETag, objInfo.HashSum)
|
||||
api.WriteSuccessResponseHeadersOnly(w)
|
||||
}
|
||||
|
||||
|
@ -431,15 +436,16 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
Header: metadata,
|
||||
}
|
||||
|
||||
info, err := h.obj.PutObject(r.Context(), params)
|
||||
extendedObjInfo, err := h.obj.PutObject(r.Context(), params)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
objInfo := extendedObjInfo.ObjectInfo
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedPost,
|
||||
NotificationInfo: data.NotificationInfoFromObject(info),
|
||||
NotificationInfo: data.NotificationInfoFromObject(objInfo),
|
||||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
|
@ -453,20 +459,23 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
r.Header.Set(api.AmzGrantWrite, "")
|
||||
r.Header.Set(api.AmzGrantRead, "")
|
||||
|
||||
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, info); err != nil {
|
||||
if newEaclTable, err = h.getNewEAclTable(r, bktInfo, objInfo); err != nil {
|
||||
h.logAndSendError(w, "could not get new eacl table", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
t := &layer.ObjectVersion{
|
||||
if tagSet != nil {
|
||||
tagPrm := &layer.PutObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: info.Name,
|
||||
VersionID: info.VersionID(),
|
||||
ObjectName: objInfo.Name,
|
||||
VersionID: objInfo.VersionID(),
|
||||
},
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
|
||||
if tagSet != nil {
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -488,7 +497,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if settings.VersioningEnabled() {
|
||||
w.Header().Set(api.AmzVersionID, info.VersionID())
|
||||
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
|
||||
}
|
||||
|
||||
if redirectURL := auth.MultipartFormValue(r, "success_action_redirect"); redirectURL != "" {
|
||||
|
@ -503,9 +512,9 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
case "201":
|
||||
status = http.StatusCreated
|
||||
resp := &PostResponse{
|
||||
Bucket: info.Bucket,
|
||||
Key: info.Name,
|
||||
ETag: info.HashSum,
|
||||
Bucket: objInfo.Bucket,
|
||||
Key: objInfo.Name,
|
||||
ETag: objInfo.HashSum,
|
||||
}
|
||||
w.WriteHeader(status)
|
||||
if _, err = w.Write(api.EncodeResponse(resp)); err != nil {
|
||||
|
@ -515,7 +524,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
w.Header().Set(api.ETag, info.HashSum)
|
||||
w.Header().Set(api.ETag, objInfo.HashSum)
|
||||
w.WriteHeader(status)
|
||||
}
|
||||
|
||||
|
|
|
@ -38,13 +38,15 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.ObjectVersion{
|
||||
tagPrm := &layer.PutObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
},
|
||||
TagSet: tagSet,
|
||||
}
|
||||
|
||||
nodeVersion, err := h.obj.PutObjectTagging(r.Context(), p, tagSet)
|
||||
nodeVersion, err := h.obj.PutObjectTagging(r.Context(), tagPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
|
||||
return
|
||||
|
@ -83,13 +85,15 @@ func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
return
|
||||
}
|
||||
|
||||
p := &layer.ObjectVersion{
|
||||
tagPrm := &layer.GetObjectTaggingParams{
|
||||
ObjectVersion: &layer.ObjectVersion{
|
||||
BktInfo: bktInfo,
|
||||
ObjectName: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
},
|
||||
}
|
||||
|
||||
versionID, tagSet, err := h.obj.GetObjectTagging(r.Context(), p)
|
||||
versionID, tagSet, err := h.obj.GetObjectTagging(r.Context(), tagPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -213,13 +213,13 @@ type (
|
|||
PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error
|
||||
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
||||
GetObjectTagging(ctx context.Context, p *ObjectVersion) (string, map[string]string, error)
|
||||
PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) (*data.NodeVersion, error)
|
||||
GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams) (string, map[string]string, error)
|
||||
PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) (*data.NodeVersion, error)
|
||||
DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error)
|
||||
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error)
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error)
|
||||
CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error)
|
||||
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
|
||||
|
@ -228,7 +228,7 @@ type (
|
|||
DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject
|
||||
|
||||
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
|
||||
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error)
|
||||
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error)
|
||||
UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
|
||||
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
|
||||
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
|
||||
|
@ -481,7 +481,7 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
|||
}
|
||||
|
||||
// CopyObject from one bucket into another bucket.
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ObjectInfo, error) {
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -349,7 +349,7 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) {
|
|||
return n + next, err
|
||||
}
|
||||
|
||||
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error) {
|
||||
func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
|
||||
for i := 1; i < len(p.Parts); i++ {
|
||||
if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber {
|
||||
return nil, nil, errors.GetAPIError(errors.ErrInvalidPartOrder)
|
||||
|
@ -433,7 +433,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
|
||||
r.prm.bktInfo = p.Info.Bkt
|
||||
|
||||
obj, err := n.PutObject(ctx, &PutObjectParams{
|
||||
extObjInfo, err := n.PutObject(ctx, &PutObjectParams{
|
||||
BktInfo: p.Info.Bkt,
|
||||
Object: p.Info.Key,
|
||||
Reader: r,
|
||||
|
@ -464,7 +464,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
n.cache.DeleteObject(addr)
|
||||
}
|
||||
|
||||
return uploadData, obj, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
|
||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID)
|
||||
}
|
||||
|
||||
func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) {
|
||||
|
|
|
@ -184,7 +184,7 @@ func ParseCompletedPartHeader(hdr string) (*Part, error) {
|
|||
}
|
||||
|
||||
// PutObject stores object into NeoFS, took payload from io.Reader.
|
||||
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ObjectInfo, error) {
|
||||
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
owner := n.Owner(ctx)
|
||||
|
||||
bktSettings, err := n.GetBucketSettings(ctx, p.BktInfo)
|
||||
|
@ -294,7 +294,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
|||
|
||||
n.cache.PutObjectWithName(owner, extendedObjInfo)
|
||||
|
||||
return objInfo, nil
|
||||
return extendedObjInfo, nil
|
||||
}
|
||||
|
||||
func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) {
|
||||
|
|
|
@ -32,16 +32,11 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro
|
|||
// sometimes node version can be provided from executing context
|
||||
// if not, then receive node version from tree service
|
||||
if versionNode == nil {
|
||||
// check cache if node version is stored inside extendedObjectVersion
|
||||
versionNode = n.getNodeVersionFromCache(n.Owner(ctx), p.ObjVersion)
|
||||
if versionNode == nil {
|
||||
// else get node version from tree service
|
||||
versionNode, err = n.getNodeVersion(ctx, p.ObjVersion)
|
||||
versionNode, err = n.getNodeVersionFromCacheOrNeofs(ctx, p.ObjVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lockInfo, err := n.treeService.GetLock(ctx, p.ObjVersion.BktInfo, versionNode.ID)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
|
@ -105,6 +100,17 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro
|
|||
return nil
|
||||
}
|
||||
|
||||
func (n *layer) getNodeVersionFromCacheOrNeofs(ctx context.Context, objVersion *ObjectVersion) (nodeVersion *data.NodeVersion, err error) {
|
||||
// check cache if node version is stored inside extendedObjectVersion
|
||||
nodeVersion = n.getNodeVersionFromCache(n.Owner(ctx), objVersion)
|
||||
if nodeVersion == nil {
|
||||
// else get node version from tree service
|
||||
return n.getNodeVersion(ctx, objVersion)
|
||||
}
|
||||
|
||||
return nodeVersion, nil
|
||||
}
|
||||
|
||||
func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber uint32) (oid.ID, error) {
|
||||
prm := PrmObjectCreate{
|
||||
Container: bktInfo.CID,
|
||||
|
|
|
@ -11,26 +11,45 @@ import (
|
|||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
)
|
||||
|
||||
func (n *layer) GetObjectTagging(ctx context.Context, p *ObjectVersion) (string, map[string]string, error) {
|
||||
type GetObjectTaggingParams struct {
|
||||
ObjectVersion *ObjectVersion
|
||||
|
||||
// NodeVersion can be nil. If not nil we save one request to tree service.
|
||||
NodeVersion *data.NodeVersion // optional
|
||||
}
|
||||
|
||||
type PutObjectTaggingParams struct {
|
||||
ObjectVersion *ObjectVersion
|
||||
TagSet map[string]string
|
||||
|
||||
// NodeVersion can be nil. If not nil we save one request to tree service.
|
||||
NodeVersion *data.NodeVersion // optional
|
||||
}
|
||||
|
||||
func (n *layer) GetObjectTagging(ctx context.Context, p *GetObjectTaggingParams) (string, map[string]string, error) {
|
||||
var err error
|
||||
owner := n.Owner(ctx)
|
||||
|
||||
if len(p.VersionID) != 0 && p.VersionID != data.UnversionedObjectVersionID {
|
||||
if tags := n.cache.GetTagging(owner, objectTaggingCacheKey(p)); tags != nil {
|
||||
return p.VersionID, tags, nil
|
||||
if len(p.ObjectVersion.VersionID) != 0 && p.ObjectVersion.VersionID != data.UnversionedObjectVersionID {
|
||||
if tags := n.cache.GetTagging(owner, objectTaggingCacheKey(p.ObjectVersion)); tags != nil {
|
||||
return p.ObjectVersion.VersionID, tags, nil
|
||||
}
|
||||
}
|
||||
|
||||
version, err := n.getNodeVersion(ctx, p)
|
||||
nodeVersion := p.NodeVersion
|
||||
if nodeVersion == nil {
|
||||
nodeVersion, err = n.getNodeVersionFromCacheOrNeofs(ctx, p.ObjectVersion)
|
||||
if err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
p.VersionID = version.OID.EncodeToString()
|
||||
}
|
||||
p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString()
|
||||
|
||||
if tags := n.cache.GetTagging(owner, objectTaggingCacheKey(p)); tags != nil {
|
||||
return p.VersionID, tags, nil
|
||||
if tags := n.cache.GetTagging(owner, objectTaggingCacheKey(p.ObjectVersion)); tags != nil {
|
||||
return p.ObjectVersion.VersionID, tags, nil
|
||||
}
|
||||
|
||||
tags, err := n.treeService.GetObjectTagging(ctx, p.BktInfo, version)
|
||||
tags, err := n.treeService.GetObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion)
|
||||
if err != nil {
|
||||
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return "", nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||
|
@ -38,19 +57,22 @@ func (n *layer) GetObjectTagging(ctx context.Context, p *ObjectVersion) (string,
|
|||
return "", nil, err
|
||||
}
|
||||
|
||||
n.cache.PutTagging(owner, objectTaggingCacheKey(p), tags)
|
||||
n.cache.PutTagging(owner, objectTaggingCacheKey(p.ObjectVersion), tags)
|
||||
|
||||
return p.VersionID, tags, nil
|
||||
return p.ObjectVersion.VersionID, tags, nil
|
||||
}
|
||||
|
||||
func (n *layer) PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet map[string]string) (*data.NodeVersion, error) {
|
||||
version, err := n.getNodeVersion(ctx, p)
|
||||
func (n *layer) PutObjectTagging(ctx context.Context, p *PutObjectTaggingParams) (nodeVersion *data.NodeVersion, err error) {
|
||||
nodeVersion = p.NodeVersion
|
||||
if nodeVersion == nil {
|
||||
nodeVersion, err = n.getNodeVersionFromCacheOrNeofs(ctx, p.ObjectVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.VersionID = version.OID.EncodeToString()
|
||||
}
|
||||
p.ObjectVersion.VersionID = nodeVersion.OID.EncodeToString()
|
||||
|
||||
err = n.treeService.PutObjectTagging(ctx, p.BktInfo, version, tagSet)
|
||||
err = n.treeService.PutObjectTagging(ctx, p.ObjectVersion.BktInfo, nodeVersion, p.TagSet)
|
||||
if err != nil {
|
||||
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
|
||||
|
@ -58,9 +80,9 @@ func (n *layer) PutObjectTagging(ctx context.Context, p *ObjectVersion, tagSet m
|
|||
return nil, err
|
||||
}
|
||||
|
||||
n.cache.PutTagging(n.Owner(ctx), objectTaggingCacheKey(p), tagSet)
|
||||
n.cache.PutTagging(n.Owner(ctx), objectTaggingCacheKey(p.ObjectVersion), p.TagSet)
|
||||
|
||||
return version, nil
|
||||
return nodeVersion, nil
|
||||
}
|
||||
|
||||
func (n *layer) DeleteObjectTagging(ctx context.Context, p *ObjectVersion) (*data.NodeVersion, error) {
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
)
|
||||
|
||||
func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
||||
objInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
|
||||
extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Object: tc.obj,
|
||||
Size: int64(len(content)),
|
||||
|
@ -27,7 +27,7 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
|||
})
|
||||
require.NoError(tc.t, err)
|
||||
|
||||
return objInfo
|
||||
return extObjInfo.ObjectInfo
|
||||
}
|
||||
|
||||
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
|
||||
|
|
Loading…
Reference in a new issue