[#78] layer: Clean up already removed object from tree
Signed-off-by: Artem Tataurov <a.tataurov@yadro.com>
This commit is contained in:
parent
43e336e155
commit
a0f0d792b8
4 changed files with 82 additions and 5 deletions
|
@ -24,6 +24,7 @@ This document outlines major changes between releases.
|
||||||
- Support impersonate bearer token (#81)
|
- Support impersonate bearer token (#81)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
- Remove object from tree and reset its cache on object deletion when it is already removed from storage (#78)
|
||||||
- Update prometheus to v1.15.0 (#94)
|
- Update prometheus to v1.15.0 (#94)
|
||||||
- Update syncTree.sh due to recent renaming (#73)
|
- Update syncTree.sh due to recent renaming (#73)
|
||||||
- Update neo-go to v0.101.0 (#14)
|
- Update neo-go to v0.101.0 (#14)
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,6 +17,26 @@ const (
|
||||||
emptyVersion = ""
|
emptyVersion = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestDeleteBucketOnAlreadyRemovedError(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||||
|
bktInfo := createTestBucket(hc, bktName)
|
||||||
|
|
||||||
|
putObject(t, hc, bktName, objName)
|
||||||
|
|
||||||
|
nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(bktInfo.CID)
|
||||||
|
addr.SetObject(nodeVersion.OID)
|
||||||
|
hc.tp.SetObjectError(addr, apistatus.ObjectAlreadyRemoved{})
|
||||||
|
|
||||||
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||||
|
|
||||||
|
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteBucket(t *testing.T) {
|
func TestDeleteBucket(t *testing.T) {
|
||||||
tc := prepareHandlerContext(t)
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
@ -358,6 +380,25 @@ func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version st
|
||||||
return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != ""
|
return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteObjects(t *testing.T, tc *handlerContext, bktName string, objVersions [][2]string) *DeleteObjectsResponse {
|
||||||
|
req := &DeleteObjectsRequest{}
|
||||||
|
for _, version := range objVersions {
|
||||||
|
req.Objects = append(req.Objects, ObjectIdentifier{
|
||||||
|
ObjectName: version[0],
|
||||||
|
VersionID: version[1],
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
w, r := prepareTestRequest(tc, bktName, "", req)
|
||||||
|
r.Header.Set(api.ContentMD5, "")
|
||||||
|
tc.Handler().DeleteMultipleObjectsHandler(w, r)
|
||||||
|
assertStatus(t, w, http.StatusOK)
|
||||||
|
|
||||||
|
res := &DeleteObjectsResponse{}
|
||||||
|
parseTestResponse(t, w, res)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
|
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
|
||||||
w, r := prepareTestRequest(tc, bktName, "", nil)
|
w, r := prepareTestRequest(tc, bktName, "", nil)
|
||||||
tc.Handler().DeleteBucketHandler(w, r)
|
tc.Handler().DeleteBucketHandler(w, r)
|
||||||
|
|
|
@ -29,6 +29,7 @@ type TestFrostFS struct {
|
||||||
FrostFS
|
FrostFS
|
||||||
|
|
||||||
objects map[string]*object.Object
|
objects map[string]*object.Object
|
||||||
|
objectErrors map[string]error
|
||||||
containers map[string]*container.Container
|
containers map[string]*container.Container
|
||||||
eaclTables map[string]*eacl.Table
|
eaclTables map[string]*eacl.Table
|
||||||
currentEpoch uint64
|
currentEpoch uint64
|
||||||
|
@ -36,9 +37,10 @@ type TestFrostFS struct {
|
||||||
|
|
||||||
func NewTestFrostFS() *TestFrostFS {
|
func NewTestFrostFS() *TestFrostFS {
|
||||||
return &TestFrostFS{
|
return &TestFrostFS{
|
||||||
objects: make(map[string]*object.Object),
|
objects: make(map[string]*object.Object),
|
||||||
containers: make(map[string]*container.Container),
|
objectErrors: make(map[string]error),
|
||||||
eaclTables: make(map[string]*eacl.Table),
|
containers: make(map[string]*container.Container),
|
||||||
|
eaclTables: make(map[string]*eacl.Table),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +48,14 @@ func (t *TestFrostFS) CurrentEpoch() uint64 {
|
||||||
return t.currentEpoch
|
return t.currentEpoch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) {
|
||||||
|
if err == nil {
|
||||||
|
delete(t.objectErrors, addr.EncodeToString())
|
||||||
|
} else {
|
||||||
|
t.objectErrors[addr.EncodeToString()] = err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) Objects() []*object.Object {
|
func (t *TestFrostFS) Objects() []*object.Object {
|
||||||
res := make([]*object.Object, 0, len(t.objects))
|
res := make([]*object.Object, 0, len(t.objects))
|
||||||
|
|
||||||
|
@ -153,6 +163,10 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
||||||
|
|
||||||
sAddr := addr.EncodeToString()
|
sAddr := addr.EncodeToString()
|
||||||
|
|
||||||
|
if err := t.objectErrors[sAddr]; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
if obj, ok := t.objects[sAddr]; ok {
|
if obj, ok := t.objects[sAddr]; ok {
|
||||||
owner := getOwner(ctx)
|
owner := getOwner(ctx)
|
||||||
if !obj.OwnerID().Equals(owner) && !t.isPublicRead(prm.Container) {
|
if !obj.OwnerID().Equals(owner) && !t.isPublicRead(prm.Container) {
|
||||||
|
@ -239,6 +253,10 @@ func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) err
|
||||||
addr.SetContainer(prm.Container)
|
addr.SetContainer(prm.Container)
|
||||||
addr.SetObject(prm.Object)
|
addr.SetObject(prm.Object)
|
||||||
|
|
||||||
|
if err := t.objectErrors[addr.EncodeToString()]; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if obj, ok := t.objects[addr.EncodeToString()]; ok {
|
if obj, ok := t.objects[addr.EncodeToString()]; ok {
|
||||||
owner := getOwner(ctx)
|
owner := getOwner(ctx)
|
||||||
if !obj.OwnerID().Equals(owner) {
|
if !obj.OwnerID().Equals(owner) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -553,7 +554,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
||||||
return obj
|
return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
|
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
|
||||||
|
@ -583,7 +584,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
|
|
||||||
if nullVersionToDelete != nil {
|
if nullVersionToDelete != nil {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil {
|
||||||
return obj
|
return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -632,6 +633,22 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
||||||
|
if client.IsErrObjectAlreadyRemoved(obj.Error) {
|
||||||
|
n.log.Debug("object already removed", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID),
|
||||||
|
zap.String("object", obj.Name), zap.String("oid", obj.VersionID))
|
||||||
|
|
||||||
|
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
|
||||||
|
if obj.Error != nil {
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
func isNotFoundError(err error) bool {
|
func isNotFoundError(err error) bool {
|
||||||
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
return errors.IsS3Error(err, errors.ErrNoSuchKey) ||
|
||||||
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
errors.IsS3Error(err, errors.ErrNoSuchVersion)
|
||||||
|
|
Loading…
Reference in a new issue