[#83] Don't create extra delete marker

We shouldn't create delete marker if:
1. object doesn't exist at all
2. last version is already a delete marker

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-04-14 15:36:22 +03:00
parent 4a8c382491
commit 70ec5a0a5b
5 changed files with 145 additions and 12 deletions

View file

@ -8,6 +8,7 @@ This document outlines major changes between releases.
- Clean up List and Name caches when object is missing in Tree service (#57) - Clean up List and Name caches when object is missing in Tree service (#57)
- Get empty bucket CORS from frostfs (TrueCloudLab#36) - Get empty bucket CORS from frostfs (TrueCloudLab#36)
- Don't count pool error on client abort (#35) - Don't count pool error on client abort (#35)
- Don't create unnecessary delete-markers (#83)
### Added ### Added
- Return `X-Owner-Id` in `head-bucket` response (#79) - Return `X-Owner-Id` in `head-bucket` response (#79)

View file

@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
} }
func TestDeleteMarkerVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
}
func TestDeleteMarkerSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
putBucketVersioning(t, tc, bktName, false)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
objName := "obj3"
putObject(t, tc, bktName, objName)
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
require.NoError(t, err)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
objVersions := getVersion(listVersions(t, tc, bktName), objName)
require.Len(t, objVersions, 0)
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
})
}
func TestDeleteObjectCombined(t *testing.T) { func TestDeleteObjectCombined(t *testing.T) {
tc := prepareHandlerContext(t) tc := prepareHandlerContext(t)
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
deleteObject(t, tc, bktName, objName, emptyVersion) deleteObject(t, tc, bktName, objName, emptyVersion)
versions := listVersions(t, tc, bktName) versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty") require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res return res
} }
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
var res []*ObjectVersionResponse
for i, version := range resp.Version {
if version.Key == objName {
res = append(res, &resp.Version[i])
}
}
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) { func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content")) body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(tc, bktName, objName, body) w, r := prepareTestPayloadRequest(tc, bktName, objName, body)

View file

@ -33,6 +33,7 @@ type handlerContext struct {
t *testing.T t *testing.T
h *handler h *handler
tp *layer.TestFrostFS tp *layer.TestFrostFS
tree *tree.Tree
context context.Context context context.Context
} }
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
var owner user.ID var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey) user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
layerCfg := &layer.Config{ layerCfg := &layer.Config{
Caches: layer.DefaultCachesConfigs(zap.NewExample()), Caches: layer.DefaultCachesConfigs(zap.NewExample()),
AnonKey: layer.AnonymousKey{Key: key}, AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver, Resolver: testResolver,
TreeService: NewTreeServiceMock(t), TreeService: treeMock,
} }
var pp netmap.PlacementPolicy var pp netmap.PlacementPolicy
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
t: t, t: t,
h: h, h: h,
tp: tp, tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)), context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
} }
} }

View file

@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj return obj
} }
var newVersion *data.NodeVersion lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
if err != nil {
obj.Error = err
return n.handleNotFoundError(bkt, obj)
}
if settings.VersioningSuspended() { if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion var nullVersionToDelete *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { if lastVersion.IsUnversioned {
return n.handleNotFoundError(bkt, obj) if !lastVersion.IsDeleteMarker() {
nullVersionToDelete = lastVersion
} }
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if !isNotFoundError(obj.Error) {
return obj return obj
} }
} }
if nullVersionToDelete != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return obj
}
}
}
if lastVersion.IsDeleteMarker() {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
randOID, err := getRandomOID() randOID, err := getRandomOID()
if err != nil { if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err) obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
obj.DeleteMarkVersion = randOID.EncodeToString() obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{ newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{ BaseNodeVersion: data.BaseNodeVersion{
OID: randOID, OID: randOID,
FilePath: obj.Name, FilePath: obj.Name,
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) || if isNotFoundError(obj.Error) {
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
obj.Error = nil obj.Error = nil
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj return obj
} }
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{ objVersion := &ObjectVersion{
BktInfo: bkt, BktInfo: bkt,
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
return n.getNodeVersion(ctx, objVersion) return n.getNodeVersion(ctx, objVersion)
} }
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: "",
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() { if nodeVersion.IsDeleteMarker() {
return obj.VersionID, nil return obj.VersionID, nil

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res return res
} }
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) {
return true
}
}
return false
}
func (t *TestFrostFS) AddObject(key string, obj *object.Object) { func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
t.objects[key] = obj t.objects[key] = obj
} }
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil }, nil
} }
return nil, fmt.Errorf("object not found %s", addr) return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
} }
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) { func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {