[#431] Delete all split version at once
All checks were successful
/ Builds (1.21) (pull_request) Successful in 2m8s
/ Builds (1.22) (pull_request) Successful in 2m6s
/ DCO (pull_request) Successful in 2m3s
/ Vulncheck (pull_request) Successful in 1m54s
/ Lint (pull_request) Successful in 4m15s
/ Tests (1.21) (pull_request) Successful in 2m28s
/ Tests (1.22) (pull_request) Successful in 2m33s
All checks were successful
/ Builds (1.21) (pull_request) Successful in 2m8s
/ Builds (1.22) (pull_request) Successful in 2m6s
/ DCO (pull_request) Successful in 2m3s
/ Vulncheck (pull_request) Successful in 1m54s
/ Lint (pull_request) Successful in 4m15s
/ Tests (1.21) (pull_request) Successful in 2m28s
/ Tests (1.22) (pull_request) Successful in 2m33s
Previously after split we can get two `null` versioned object with the same key and deleting such key removes only one node/object. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
313ee45657
commit
19262730c1
3 changed files with 112 additions and 38 deletions
|
@ -346,6 +346,23 @@ func TestDeleteObjectSuspended(t *testing.T) {
|
||||||
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteObjectSuspendedNull(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||||
|
bktInfo, objInfo := createBucketAndObject(tc, bktName, objName)
|
||||||
|
|
||||||
|
putBucketVersioning(t, tc, bktName, true)
|
||||||
|
putObject(tc, bktName, objName)
|
||||||
|
|
||||||
|
putBucketVersioning(t, tc, bktName, false)
|
||||||
|
|
||||||
|
deleteObject(t, tc, bktName, objName, emptyVersion)
|
||||||
|
checkNotFound(t, tc, bktName, objName, emptyVersion)
|
||||||
|
|
||||||
|
require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't")
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteMarkers(t *testing.T) {
|
func TestDeleteMarkers(t *testing.T) {
|
||||||
tc := prepareHandlerContext(t)
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
|
|
@ -6,9 +6,11 @@ import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
|
stderrors "errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -16,6 +18,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
@ -660,17 +663,26 @@ func getRandomOID() (oid.ID, error) {
|
||||||
|
|
||||||
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||||
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
||||||
var nodeVersion *data.NodeVersion
|
var nodeVersions []*data.NodeVersion
|
||||||
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||||
return n.handleNotFoundError(bkt, obj)
|
return n.handleNotFoundError(bkt, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
for _, nodeVersion := range nodeVersions {
|
||||||
return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID)
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
||||||
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
|
||||||
|
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID); obj.Error != nil {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||||
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
|
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -683,20 +695,30 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
if settings.VersioningSuspended() {
|
if settings.VersioningSuspended() {
|
||||||
obj.VersionID = data.UnversionedObjectVersionID
|
obj.VersionID = data.UnversionedObjectVersionID
|
||||||
|
|
||||||
var nullVersionToDelete *data.NodeVersion
|
var nodeVersions []*data.NodeVersion
|
||||||
if lastVersion.IsUnversioned {
|
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||||
if !lastVersion.IsDeleteMarker {
|
|
||||||
nullVersionToDelete = lastVersion
|
|
||||||
}
|
|
||||||
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
|
|
||||||
if !isNotFoundError(obj.Error) {
|
if !isNotFoundError(obj.Error) {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if nullVersionToDelete != nil {
|
for _, nodeVersion := range nodeVersions {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker {
|
||||||
return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nodeVersion.IsDeleteMarker {
|
||||||
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
||||||
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
|
||||||
|
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID); obj.Error != nil {
|
||||||
|
return obj
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -744,36 +766,70 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
|
||||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting,
|
|
||||||
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID), zap.Error(obj.Error))
|
|
||||||
|
|
||||||
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
|
|
||||||
if obj.Error == nil {
|
|
||||||
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
func isNotFoundError(err error) bool {
|
func isNotFoundError(err error) bool {
|
||||||
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
||||||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
func (n *layer) getNodeVersionsToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) ([]*data.NodeVersion, error) {
|
||||||
objVersion := &ObjectVersion{
|
var versionsToDelete []*data.NodeVersion
|
||||||
BktInfo: bkt,
|
versions, err := n.treeService.GetVersions(ctx, bkt, obj.Name)
|
||||||
ObjectName: obj.Name,
|
if err != nil {
|
||||||
VersionID: obj.VersionID,
|
if stderrors.Is(err, ErrNodeNotFound) {
|
||||||
NoErrorOnDeleteMarker: true,
|
return nil, fmt.Errorf("%w: %s", s3errors.GetAPIError(s3errors.ErrNoSuchKey), err.Error())
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.getNodeVersion(ctx, objVersion)
|
if len(versions) == 0 {
|
||||||
|
return nil, fmt.Errorf("%w: there isn't tree node with requested version id", s3errors.GetAPIError(s3errors.ErrNoSuchVersion))
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(versions, func(i, j int) bool {
|
||||||
|
return versions[i].Timestamp < versions[j].Timestamp
|
||||||
|
})
|
||||||
|
|
||||||
|
var matchFn func(nv *data.NodeVersion) bool
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case obj.VersionID == data.UnversionedObjectVersionID:
|
||||||
|
matchFn = func(nv *data.NodeVersion) bool {
|
||||||
|
return nv.IsUnversioned
|
||||||
|
}
|
||||||
|
case len(obj.VersionID) == 0:
|
||||||
|
latest := versions[len(versions)-1]
|
||||||
|
if latest.IsUnversioned {
|
||||||
|
matchFn = func(nv *data.NodeVersion) bool {
|
||||||
|
return nv.IsUnversioned
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
matchFn = func(nv *data.NodeVersion) bool {
|
||||||
|
return nv.ID == latest.ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
matchFn = func(nv *data.NodeVersion) bool {
|
||||||
|
return nv.OID.EncodeToString() == obj.VersionID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var oids []string
|
||||||
|
for _, v := range versions {
|
||||||
|
if matchFn(v) {
|
||||||
|
versionsToDelete = append(versionsToDelete, v)
|
||||||
|
if !v.IsDeleteMarker {
|
||||||
|
oids = append(oids, v.OID.EncodeToString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(versionsToDelete) == 0 {
|
||||||
|
return nil, fmt.Errorf("%w: there isn't tree node with requested version id", s3errors.GetAPIError(s3errors.ErrNoSuchVersion))
|
||||||
|
}
|
||||||
|
|
||||||
|
n.reqLogger(ctx).Debug(logs.GetTreeNodeToDelete, zap.Stringer("cid", bkt.CID), zap.Strings("oids", oids))
|
||||||
|
|
||||||
|
return versionsToDelete, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
|
||||||
|
|
|
@ -87,6 +87,7 @@ const (
|
||||||
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
FailedToSubmitTaskToPool = "failed to submit task to pool" // Warn in ../../api/layer/object.go
|
||||||
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
CouldNotFetchObjectMeta = "could not fetch object meta" // Warn in ../../api/layer/object.go
|
||||||
GetTreeNode = "get tree node" // Debug in ../../api/layer/tagging.go
|
GetTreeNode = "get tree node" // Debug in ../../api/layer/tagging.go
|
||||||
|
GetTreeNodeToDelete = "get tree node to delete" // Debug in ../../api/layer/tagging.go
|
||||||
CouldntPutBucketInfoIntoCache = "couldn't put bucket info into cache" // Warn in ../../api/layer/cache.go
|
CouldntPutBucketInfoIntoCache = "couldn't put bucket info into cache" // Warn in ../../api/layer/cache.go
|
||||||
CouldntAddObjectToCache = "couldn't add object to cache" // Warn in ../../api/layer/cache.go
|
CouldntAddObjectToCache = "couldn't add object to cache" // Warn in ../../api/layer/cache.go
|
||||||
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go
|
CouldntCacheAccessControlOperation = "couldn't cache access control operation" // Warn in ../../api/layer/cache.go
|
||||||
|
|
Loading…
Reference in a new issue