[#666] Support tagging directive for CopyObject

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-08-23 17:13:59 +03:00 committed by Kirillov Denis
parent 7b165d3f18
commit 1a580b6fa4
5 changed files with 196 additions and 23 deletions

View file

@ -17,9 +17,13 @@ import (
type copyObjectArgs struct {
Conditional *conditionalArgs
MetadataDirective string
TaggingDirective string
}
const replaceMetadataDirective = "REPLACE"
const (
replaceDirective = "REPLACE"
copyDirective = "COPY"
)
// path2BucketObject returns a bucket and an object.
func path2BucketObject(path string) (bucket, prefix string) {
@ -33,8 +37,10 @@ func path2BucketObject(path string) (bucket, prefix string) {
func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
var (
err error
versionID string
metadata map[string]string
tagSet map[string]string
sessionTokenEACL *session.Container
reqInfo = api.GetReqInfo(r.Context())
@ -42,7 +48,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
containsACL = containsACLHeaders(r)
)
src := r.Header.Get("X-Amz-Copy-Source")
src := r.Header.Get(api.AmzCopySource)
// Check https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html
// Regardless of whether you have enabled versioning, each object in your bucket
// has a version ID. If you have not enabled versioning, Amazon S3 sets the value
@ -55,23 +61,11 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
srcBucket, srcObject := path2BucketObject(src)
args, err := parseCopyObjectArgs(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse request params", reqInfo, err)
return
}
p := &layer.HeadObjectParams{
Object: srcObject,
VersionID: versionID,
}
if args.MetadataDirective == replaceMetadataDirective {
metadata = parseMetadata(r)
} else if srcBucket == reqInfo.BucketName && srcObject == reqInfo.ObjectName {
h.logAndSendError(w, "could not copy to itself", reqInfo, errors.GetAPIError(errors.ErrInvalidRequest))
return
}
if p.BktInfo, err = h.getBucketAndCheckOwner(r, srcBucket, api.AmzSourceExpectedBucketOwner); err != nil {
h.logAndSendError(w, "couldn't get source bucket", reqInfo, err)
return
@ -96,6 +90,36 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
args, err := parseCopyObjectArgs(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse request params", reqInfo, err)
return
}
if args.MetadataDirective == replaceDirective {
metadata = parseMetadata(r)
}
if args.TaggingDirective == replaceDirective {
tagSet, err = parseTaggingHeader(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse tagging header", reqInfo, err)
return
}
} else {
objVersion := &layer.ObjectVersion{
BktInfo: p.BktInfo,
ObjectName: srcObject,
VersionID: objInfo.VersionID(),
}
_, tagSet, err = h.obj.GetObjectTagging(r.Context(), objVersion)
if err != nil {
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
return
}
}
encryptionParams, err := h.formEncryptionParams(r.Header)
if err != nil {
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
@ -178,6 +202,18 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
}
}
if tagSet != nil {
t := &layer.ObjectVersion{
BktInfo: dstBktInfo,
ObjectName: reqInfo.ObjectName,
VersionID: objInfo.VersionID(),
}
if _, err = h.obj.PutObjectTagging(r.Context(), t, tagSet); err != nil {
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
return
}
}
h.log.Info("object is copied",
zap.String("bucket", objInfo.Bucket),
zap.String("object", objInfo.Name),
@ -213,7 +249,21 @@ func parseCopyObjectArgs(headers http.Header) (*copyObjectArgs, error) {
}
copyArgs := &copyObjectArgs{Conditional: args}
copyArgs.MetadataDirective = headers.Get(api.AmzMetadataDirective)
if !isValidDirective(copyArgs.MetadataDirective) {
return nil, errors.GetAPIError(errors.ErrInvalidMetadataDirective)
}
copyArgs.TaggingDirective = headers.Get(api.AmzTaggingDirective)
if !isValidDirective(copyArgs.TaggingDirective) {
return nil, errors.GetAPIError(errors.ErrInvalidTaggingDirective)
}
return copyArgs, nil
}
func isValidDirective(directive string) bool {
return len(directive) == 0 ||
directive == replaceDirective || directive == copyDirective
}