[#430] Delete all split version at once

Previously after split we can get two `null` versioned object with the same key
and deleting such key removes only one node/object.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-17 17:40:23 +03:00
parent 2948d1f942
commit 977a20760b
2 changed files with 95 additions and 38 deletions

View file

@ -6,9 +6,11 @@ import (
"crypto/rand"
"encoding/json"
"encoding/xml"
stderrors "errors"
"fmt"
"io"
"net/url"
"sort"
"strconv"
"strings"
"time"
@ -16,6 +18,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
@ -530,17 +533,26 @@ func getRandomOID() (oid.ID, error) {
func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) != 0 || settings.Unversioned() {
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
var nodeVersions []*data.NodeVersion
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
return n.handleNotFoundError(bkt, obj)
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID)
for _, nodeVersion := range nodeVersions {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj
}
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
}
if obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID); obj.Error != nil {
return obj
}
}
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
return obj
}
@ -553,20 +565,30 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID
var nullVersionToDelete *data.NodeVersion
if lastVersion.IsUnversioned {
if !lastVersion.IsDeleteMarker {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
var nodeVersions []*data.NodeVersion
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
if !isNotFoundError(obj.Error) {
return obj
}
}
if nullVersionToDelete != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID)
for _, nodeVersion := range nodeVersions {
if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker {
continue
}
if !nodeVersion.IsDeleteMarker {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj
}
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
}
}
if obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID); obj.Error != nil {
return obj
}
}
}
@ -614,36 +636,70 @@ func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj
}
func (n *Layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj
}
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
if obj.Error == nil {
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
}
return obj
}
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *Layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &data.ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: obj.VersionID,
NoErrorOnDeleteMarker: true,
func (n *Layer) getNodeVersionsToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) ([]*data.NodeVersion, error) {
var versionsToDelete []*data.NodeVersion
versions, err := n.treeService.GetVersions(ctx, bkt, obj.Name)
if err != nil {
if stderrors.Is(err, ErrNodeNotFound) {
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
}
return nil, err
}
return n.getNodeVersion(ctx, objVersion)
if len(versions) == 0 {
return nil, fmt.Errorf("%w: there isn't tree node with requested version id", s3errors.GetAPIError(s3errors.ErrNoSuchVersion))
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].Timestamp < versions[j].Timestamp
})
var matchFn func(nv *data.NodeVersion) bool
switch {
case obj.VersionID == data.UnversionedObjectVersionID:
matchFn = func(nv *data.NodeVersion) bool {
return nv.IsUnversioned
}
case len(obj.VersionID) == 0:
latest := versions[len(versions)-1]
if latest.IsUnversioned {
matchFn = func(nv *data.NodeVersion) bool {
return nv.IsUnversioned
}
} else {
matchFn = func(nv *data.NodeVersion) bool {
return nv.ID == latest.ID
}
}
default:
matchFn = func(nv *data.NodeVersion) bool {
return nv.OID.EncodeToString() == obj.VersionID
}
}
var oids []string
for _, v := range versions {
if matchFn(v) {
versionsToDelete = append(versionsToDelete, v)
if !v.IsDeleteMarker {
oids = append(oids, v.OID.EncodeToString())
}
}
}
if len(versionsToDelete) == 0 {
return nil, fmt.Errorf("%w: there isn't tree node with requested version id", s3errors.GetAPIError(s3errors.ErrNoSuchVersion))
}
n.reqLogger(ctx).Debug(logs.GetTreeNodeToDelete, zap.Stringer("cid", bkt.CID), zap.Strings("oids", oids))
return versionsToDelete, nil
}
func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {

View file

@ -83,6 +83,7 @@ const (
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
GetTreeNode = "get tree node" // Debug in ../../api/layer/tagging.go
GetTreeNodeToDelete = "get tree node to delete" // Debug in ../../api/layer/tagging.go
CouldntPutBucketInfoIntoCache = "couldn't put bucket info into cache" // Warn in ../../api/layer/cache.go
CouldntAddObjectToCache = "couldn't add object to cache" // Warn in ../../api/layer/cache.go
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go