[#186] Fix versioning issues in UploadPart

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
Angira Kekteeva 2021-11-25 18:08:02 +03:00 committed by Alex Vanin
parent 873622d4d5
commit d6f0ab8ea4
5 changed files with 43 additions and 14 deletions

View file

@ -5,11 +5,13 @@ import (
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/api/layer"
"go.uber.org/zap" "go.uber.org/zap"
@ -251,6 +253,8 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
} }
var ( var (
versionID string
queryValues = reqInfo.URL.Query() queryValues = reqInfo.URL.Query()
uploadID = queryValues.Get(uploadIDHeaderName) uploadID = queryValues.Get(uploadIDHeaderName)
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)} additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
@ -263,6 +267,10 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
} }
src := r.Header.Get(api.AmzCopySource) src := r.Header.Get(api.AmzCopySource)
if u, err := url.Parse(src); err == nil {
versionID = u.Query().Get(api.QueryVersionID)
src = u.Path
}
srcBucket, srcObject := path2BucketObject(src) srcBucket, srcObject := path2BucketObject(src)
srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange)) srcRange, err := parseRange(r.Header.Get(api.AmzCopySourceRange))
@ -275,11 +283,27 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
srcInfo, err := h.obj.GetObjectInfo(r.Context(), &layer.HeadObjectParams{ srcInfo, err := h.obj.GetObjectInfo(r.Context(), &layer.HeadObjectParams{
Bucket: srcBucket, Bucket: srcBucket,
Object: srcObject, Object: srcObject,
VersionID: versionID,
}) })
if err != nil { if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo,
errors.GetAPIError(errors.ErrBadRequest), additional...)
return
}
h.logAndSendError(w, "could not head source object", reqInfo, err, additional...) h.logAndSendError(w, "could not head source object", reqInfo, err, additional...)
return return
} }
if isDeleted(srcInfo) {
if versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo,
errors.GetAPIError(errors.ErrBadRequest), additional...)
return
}
h.logAndSendError(w, "could not head source object", reqInfo,
errors.GetAPIError(errors.ErrNoSuchKey), additional...)
return
}
args, err := parseCopyObjectArgs(r.Header) args, err := parseCopyObjectArgs(r.Header)
if err != nil { if err != nil {
@ -643,3 +667,7 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart
Parts: info.Parts, Parts: info.Parts,
} }
} }
func isDeleted(objInfo *data.ObjectInfo) bool {
return objInfo.Headers[layer.VersionsDeleteMarkAttr] == layer.DelMarkFullObject
}

View file

@ -571,7 +571,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
p := &PutObjectParams{ p := &PutObjectParams{
Object: obj.Name, Object: obj.Name,
Reader: bytes.NewReader(nil), Reader: bytes.NewReader(nil),
Header: map[string]string{versionsDeleteMarkAttr: obj.VersionID}, Header: map[string]string{VersionsDeleteMarkAttr: obj.VersionID},
} }
if len(obj.VersionID) != 0 { if len(obj.VersionID) != 0 {
version, err := n.checkVersionsExist(ctx, bkt, obj) version, err := n.checkVersionsExist(ctx, bkt, obj)
@ -580,13 +580,13 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
return obj return obj
} }
ids = []*object.ID{version.ID} ids = []*object.ID{version.ID}
if version.Headers[versionsDeleteMarkAttr] == delMarkFullObject { if version.Headers[VersionsDeleteMarkAttr] == DelMarkFullObject {
obj.DeleteMarkVersion = version.Version() obj.DeleteMarkVersion = version.Version()
} }
p.Header[versionsDelAttr] = obj.VersionID p.Header[versionsDelAttr] = obj.VersionID
} else { } else {
p.Header[versionsDeleteMarkAttr] = delMarkFullObject p.Header[VersionsDeleteMarkAttr] = DelMarkFullObject
} }
objInfo, err := n.objectPut(ctx, bkt, p) objInfo, err := n.objectPut(ctx, bkt, p)
if err != nil { if err != nil {

View file

@ -171,7 +171,7 @@ func (n *layer) objectPut(ctx context.Context, bkt *data.BucketInfo, p *PutObjec
return nil, err return nil, err
} }
if p.Header[versionsDeleteMarkAttr] == delMarkFullObject { if p.Header[VersionsDeleteMarkAttr] == DelMarkFullObject {
if last := versions.getLast(); last != nil { if last := versions.getLast(); last != nil {
n.objCache.Delete(last.Address()) n.objCache.Delete(last.Address())
} }

View file

@ -21,14 +21,15 @@ type objectVersions struct {
} }
const ( const (
VersionsDeleteMarkAttr = "S3-Versions-delete-mark"
DelMarkFullObject = "*"
unversionedObjectVersionID = "null" unversionedObjectVersionID = "null"
objectSystemAttributeName = "S3-System-name" objectSystemAttributeName = "S3-System-name"
attrVersionsIgnore = "S3-Versions-ignore" attrVersionsIgnore = "S3-Versions-ignore"
attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled" attrSettingsVersioningEnabled = "S3-Settings-Versioning-enabled"
versionsDelAttr = "S3-Versions-del" versionsDelAttr = "S3-Versions-del"
versionsAddAttr = "S3-Versions-add" versionsAddAttr = "S3-Versions-add"
versionsDeleteMarkAttr = "S3-Versions-delete-mark"
delMarkFullObject = "*"
) )
func newObjectVersions(name string) *objectVersions { func newObjectVersions(name string) *objectVersions {
@ -169,11 +170,11 @@ func (v *objectVersions) getLast() *data.ObjectInfo {
existedVersions := v.existedVersions() existedVersions := v.existedVersions()
for i := len(v.objects) - 1; i >= 0; i-- { for i := len(v.objects) - 1; i >= 0; i-- {
if contains(existedVersions, v.objects[i].Version()) { if contains(existedVersions, v.objects[i].Version()) {
delMarkHeader := v.objects[i].Headers[versionsDeleteMarkAttr] delMarkHeader := v.objects[i].Headers[VersionsDeleteMarkAttr]
if delMarkHeader == "" { if delMarkHeader == "" {
return v.objects[i] return v.objects[i]
} }
if delMarkHeader == delMarkFullObject { if delMarkHeader == DelMarkFullObject {
return nil return nil
} }
} }
@ -203,8 +204,8 @@ func (v *objectVersions) getFiltered(reverse bool) []*data.ObjectInfo {
res := make([]*data.ObjectInfo, 0, len(v.objects)) res := make([]*data.ObjectInfo, 0, len(v.objects))
for _, version := range v.objects { for _, version := range v.objects {
delMark := version.Headers[versionsDeleteMarkAttr] delMark := version.Headers[VersionsDeleteMarkAttr]
if contains(existedVersions, version.Version()) && (delMark == delMarkFullObject || delMark == "") { if contains(existedVersions, version.Version()) && (delMark == DelMarkFullObject || delMark == "") {
res = append(res, version) res = append(res, version)
} }
} }
@ -335,7 +336,7 @@ func triageVersions(objVersions []*ObjectVersionInfo) ([]*ObjectVersionInfo, []*
var resDelMarkVersions []*ObjectVersionInfo var resDelMarkVersions []*ObjectVersionInfo
for _, version := range objVersions { for _, version := range objVersions {
if version.Object.Headers[versionsDeleteMarkAttr] == delMarkFullObject { if version.Object.Headers[VersionsDeleteMarkAttr] == DelMarkFullObject {
resDelMarkVersions = append(resDelMarkVersions, version) resDelMarkVersions = append(resDelMarkVersions, version)
} else { } else {
resVersion = append(resVersion, version) resVersion = append(resVersion, version)

View file

@ -672,7 +672,7 @@ func getTestObjectInfo(id byte, addAttr, delAttr, delMarkAttr string) *data.Obje
headers[versionsDelAttr] = delAttr headers[versionsDelAttr] = delAttr
} }
if delMarkAttr != "" { if delMarkAttr != "" {
headers[versionsDeleteMarkAttr] = delMarkAttr headers[VersionsDeleteMarkAttr] = delMarkAttr
} }
return &data.ObjectInfo{ return &data.ObjectInfo{