[#319] Delete null versions in different modes

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-01-19 12:02:08 +03:00 committed by Angira Kekteeva
parent f5d365af1d
commit 8985681493

View file

@ -570,19 +570,35 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
ids []*object.ID ids []*object.ID
) )
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
if !versioningEnabled && obj.VersionID != unversionedObjectVersionID && obj.VersionID != "" {
obj.Error = errors.GetAPIError(errors.ErrInvalidVersion)
return obj
}
if versioningEnabled {
p := &PutObjectParams{ p := &PutObjectParams{
Object: obj.Name, Object: obj.Name,
Reader: bytes.NewReader(nil), Reader: bytes.NewReader(nil),
Header: map[string]string{VersionsDeleteMarkAttr: obj.VersionID}, Header: map[string]string{},
} }
if len(obj.VersionID) != 0 {
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
// Current implementation doesn't consider "unversioned" mode (so any deletion creates "delete-mark" object).
// The reason is difficulties to determinate whether versioning mode is "unversioned" or "suspended".
if obj.VersionID == unversionedObjectVersionID || !versioningEnabled && len(obj.VersionID) == 0 {
p.Header[versionsUnversionedAttr] = "true"
versions, err := n.headVersions(ctx, bkt, obj.Name)
if err != nil {
obj.Error = err
return obj
}
last := versions.getLast(FromUnversioned())
if last == nil {
obj.Error = errors.GetAPIError(errors.ErrInvalidVersion)
return obj
}
p.Header[VersionsDeleteMarkAttr] = last.Version()
for _, unversioned := range versions.unversioned() {
ids = append(ids, unversioned.ID)
}
} else if len(obj.VersionID) != 0 {
version, err := n.checkVersionsExist(ctx, bkt, obj) version, err := n.checkVersionsExist(ctx, bkt, obj)
if err != nil { if err != nil {
obj.Error = err obj.Error = err
@ -594,9 +610,11 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
} }
p.Header[versionsDelAttr] = obj.VersionID p.Header[versionsDelAttr] = obj.VersionID
p.Header[VersionsDeleteMarkAttr] = version.Version()
} else { } else {
p.Header[VersionsDeleteMarkAttr] = DelMarkFullObject p.Header[VersionsDeleteMarkAttr] = DelMarkFullObject
} }
objInfo, err := n.objectPut(ctx, bkt, p) objInfo, err := n.objectPut(ctx, bkt, p)
if err != nil { if err != nil {
obj.Error = err obj.Error = err
@ -605,13 +623,6 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
if len(obj.VersionID) == 0 { if len(obj.VersionID) == 0 {
obj.DeleteMarkVersion = objInfo.Version() obj.DeleteMarkVersion = objInfo.Version()
} }
} else {
ids, err = n.objectSearchByName(ctx, bkt.CID, obj.Name)
if err != nil {
obj.Error = err
return obj
}
}
for _, id := range ids { for _, id := range ids {
if err = n.objectDelete(ctx, bkt.CID, id); err != nil { if err = n.objectDelete(ctx, bkt.CID, id); err != nil {