[#542] Fix object removal

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-06-24 15:39:30 +03:00 committed by Alex Vanin
parent 88c392d024
commit fdf0974679
5 changed files with 268 additions and 181 deletions

View file

@ -115,8 +115,9 @@ type (
}
DeleteObjectParams struct {
BktInfo *data.BucketInfo
Objects []*VersionedObject
BktInfo *data.BucketInfo
Objects []*VersionedObject
Settings *data.BucketSettings
}
// PutSettingsParams stores object copy request parameters.
@ -236,7 +237,7 @@ type (
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error)
DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error)
@ -473,55 +474,10 @@ func getRandomOID() (oid.ID, error) {
return objID, nil
}
// DeleteObject removes all objects with the passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) == 0 {
obj.VersionID = UnversionedObjectVersionID
}
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: obj.VersionID,
NoErrorOnDeleteMarker: true,
}
var nodeVersion *data.NodeVersion
nodeVersion, obj.Error = n.getNodeVersion(ctx, objVersion)
if obj.VersionID == UnversionedObjectVersionID {
if obj.Error == nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
} else if !errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) {
return obj
}
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
return obj
}
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: time.Now(),
Owner: n.Owner(ctx),
},
IsUnversioned: true,
}
if obj.Error = n.treeService.AddVersion(ctx, bkt.CID, newVersion); obj.Error != nil {
return obj
}
n.namesCache.Delete(bkt.Name + "/" + obj.Name)
} else {
if obj.Error != nil {
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) != 0 {
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return obj
}
@ -529,16 +485,65 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
return obj
}
if obj.Error = n.treeService.RemoveVersion(ctx, bkt.CID, nodeVersion.ID); obj.Error != nil {
obj.Error = n.treeService.RemoveVersion(ctx, bkt.CID, nodeVersion.ID)
return obj
}
var newVersion *data.NodeVersion
if !settings.VersioningEnabled {
obj.VersionID = UnversionedObjectVersionID
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return obj
}
if obj.Error == nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
} else if !errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) {
return obj
}
}
n.listsCache.CleanCacheEntriesContainingObject(obj.Name, bkt.CID)
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
return obj
}
newVersion = &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: time.Now(),
Owner: n.Owner(ctx),
},
IsUnversioned: !settings.VersioningEnabled,
}
if obj.Error = n.treeService.AddVersion(ctx, bkt.CID, newVersion); obj.Error != nil {
return obj
}
n.namesCache.Delete(bkt.Name + "/" + obj.Name)
return obj
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: obj.VersionID,
NoErrorOnDeleteMarker: true,
}
return n.getNodeVersion(ctx, objVersion)
}
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.DeleteMarker != nil {
return obj.VersionID, nil
@ -548,12 +553,12 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error) {
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
for i, obj := range p.Objects {
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, obj)
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj)
}
return p.Objects, nil
return p.Objects
}
func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {