[#83] Don't create extra delete marker

We shouldn't create delete marker if:
1. object doesn't exist at all
2. last version is already a delete marker

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-04-14 15:36:22 +03:00
parent 17b8905b24
commit 5d4304e204
5 changed files with 145 additions and 12 deletions

View file

@ -8,6 +8,7 @@ This document outlines major changes between releases.
- Clean up List and Name caches when object is missing in Tree service (#57)
- Get empty bucket CORS from frostfs (TrueCloudLab#36)
- Don't count pool error on client abort (#35)
- Don't create unnecessary delete-markers (#83)
### Added
- Return `X-Owner-Id` in `head-bucket` response (#79)

View file

@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) {
require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should")
}
func TestDeleteMarkerVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
}
func TestDeleteMarkerSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-for-removal", "object-to-delete"
bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName)
putBucketVersioning(t, tc, bktName, false)
t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) {
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
})
t.Run("do not create delete marker if object does not exist", func(t *testing.T) {
versionsBefore := listVersions(t, tc, bktName)
_, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion)
require.False(t, isDeleteMarker)
versionsAfter := listVersions(t, tc, bktName)
require.Equal(t, versionsBefore, versionsAfter)
})
t.Run("remove last unversioned non delete marker", func(t *testing.T) {
objName := "obj3"
putObject(t, tc, bktName, objName)
nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName)
require.NoError(t, err)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion)
objVersions := getVersion(listVersions(t, tc, bktName), objName)
require.Len(t, objVersions, 0)
require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID))
})
}
func TestDeleteObjectCombined(t *testing.T) {
tc := prepareHandlerContext(t)
@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) {
deleteObject(t, tc, bktName, objName, emptyVersion)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty")
require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs")
@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects
return res
}
func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse {
var res []*ObjectVersionResponse
for i, version := range resp.Version {
if version.Key == objName {
res = append(res, &resp.Version[i])
}
}
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(tc, bktName, objName, body)

View file

@ -33,6 +33,7 @@ type handlerContext struct {
t *testing.T
h *handler
tp *layer.TestFrostFS
tree *tree.Tree
context context.Context
}
@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
treeMock := NewTreeServiceMock(t)
layerCfg := &layer.Config{
Caches: layer.DefaultCachesConfigs(zap.NewExample()),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: NewTreeServiceMock(t),
TreeService: treeMock,
}
var pp netmap.PlacementPolicy
@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
t: t,
h: h,
tp: tp,
tree: treeMock,
context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)),
}
}

View file

@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
var newVersion *data.NodeVersion
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
if err != nil {
obj.Error = err
return n.handleNotFoundError(bkt, obj)
}
if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return n.handleNotFoundError(bkt, obj)
var nullVersionToDelete *data.NodeVersion
if lastVersion.IsUnversioned {
if !lastVersion.IsDeleteMarker() {
nullVersionToDelete = lastVersion
}
} else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
if !isNotFoundError(obj.Error) {
return obj
}
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
if nullVersionToDelete != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
return obj
}
}
}
if lastVersion.IsDeleteMarker() {
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
return obj
}
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
}
func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
if isNotFoundError(obj.Error) {
obj.Error = nil
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
return obj
}
func isNotFoundError(err error) bool {
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: "",
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.IsDeleteMarker() {
return obj.VersionID, nil

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object {
return res
}
func (t *TestFrostFS) ObjectExists(objID oid.ID) bool {
for _, obj := range t.objects {
if id, _ := obj.ID(); id.Equals(objID) {
return true
}
}
return false
}
func (t *TestFrostFS) AddObject(key string, obj *object.Object) {
t.objects[key] = obj
}
@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
}, nil
}
return nil, fmt.Errorf("object not found %s", addr)
return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr)
}
func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {