forked from TrueCloudLab/frostfs-s3-gw
[#601] Reuse common parts for object removing
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0f656b1471
commit
788836fc9a
2 changed files with 191 additions and 8 deletions
106
api/layer/delete.go
Normal file
106
api/layer/delete.go
Normal file
|
@ -0,0 +1,106 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
returnCb func(obj *VersionedObject)
|
||||
rmOldVersionObjFn func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error)
|
||||
rmVersion func(ctx context.Context, nodeID uint64) error
|
||||
createDeleteMarker func(ctx context.Context, obj *VersionedObject) *VersionedObject
|
||||
)
|
||||
|
||||
func (n *Layer) genericDelete(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject,
|
||||
cb returnCb, rmOld rmOldVersionObjFn, rmVers rmVersion, createMarker createDeleteMarker) {
|
||||
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
||||
var nodeVersions []*data.NodeVersion
|
||||
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
cb(n.handleNotFoundError(bkt, obj))
|
||||
return
|
||||
}
|
||||
|
||||
for _, nodeVersion := range nodeVersions {
|
||||
obj.DeleteMarkVersion, obj.Error = rmOld(ctx, nodeVersion)
|
||||
if obj.Error != nil {
|
||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting, zap.Stringer("cid", bkt.CID),
|
||||
zap.String("oid", obj.VersionID), zap.Error(obj.Error), logs.TagField(logs.TagExternalStorage))
|
||||
}
|
||||
|
||||
if obj.Error = rmVers(ctx, nodeVersion.ID); obj.Error != nil {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
|
||||
lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj)
|
||||
if err != nil {
|
||||
obj.Error = err
|
||||
cb(n.handleNotFoundError(bkt, obj))
|
||||
return
|
||||
}
|
||||
|
||||
if settings.VersioningSuspended() {
|
||||
obj.VersionID = data.UnversionedObjectVersionID
|
||||
|
||||
var nodeVersions []*data.NodeVersion
|
||||
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
if !isNotFoundError(obj.Error) {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i, nodeVersion := range nodeVersions {
|
||||
if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker {
|
||||
nodeVersions = append(nodeVersions[:i], nodeVersions[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, nodeVersion := range nodeVersions {
|
||||
obj.DeleteMarkVersion, obj.Error = rmOld(ctx, nodeVersion)
|
||||
if obj.Error != nil {
|
||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting, zap.Stringer("cid", bkt.CID),
|
||||
zap.String("oid", obj.VersionID), zap.Error(obj.Error), logs.TagField(logs.TagExternalStorage))
|
||||
}
|
||||
|
||||
if obj.Error = rmVers(ctx, nodeVersion.ID); obj.Error != nil {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lastVersion.IsDeleteMarker {
|
||||
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
|
||||
if obj = createMarker(ctx, obj); obj.Error != nil {
|
||||
cb(obj)
|
||||
return
|
||||
}
|
||||
|
||||
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
|
||||
cb(obj)
|
||||
}
|
|
@ -705,6 +705,32 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
return obj
|
||||
}
|
||||
|
||||
func (n *Layer) deleteObject2(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject,
|
||||
networkInfo netmap.NetworkInfo) *VersionedObject {
|
||||
|
||||
n.genericDelete(ctx, bkt, settings, obj,
|
||||
func(*VersionedObject) {},
|
||||
func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error) {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
return obj.VersionID, nil
|
||||
}
|
||||
|
||||
if nodeVersion.IsCombined {
|
||||
return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||
}
|
||||
|
||||
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
||||
},
|
||||
func(ctx context.Context, nodeID uint64) error {
|
||||
return n.treeService.RemoveVersion(ctx, bkt, nodeID)
|
||||
},
|
||||
func(ctx context.Context, obj *VersionedObject) *VersionedObject {
|
||||
return n.createDeleteMarker(ctx, bkt, settings, networkInfo, obj)
|
||||
})
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
if isNotFoundError(obj.Error) {
|
||||
obj.Error = nil
|
||||
|
@ -848,7 +874,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver
|
|||
|
||||
if !p.IsMultiple {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||
p.Objects[i] = n.deleteObject2(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||
}
|
||||
return p.Objects
|
||||
}
|
||||
|
@ -900,7 +926,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver
|
|||
}
|
||||
|
||||
if obj.needDeleteMarker && obj.obj.Error == nil {
|
||||
obj.obj = n.createDeleteMarker(ctx, p, obj.obj)
|
||||
obj.obj = n.createDeleteMarker(ctx, p.BktInfo, p.Settings, p.NetworkInfo, obj.obj)
|
||||
}
|
||||
|
||||
resObjects = append(resObjects, obj.obj)
|
||||
|
@ -911,7 +937,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver
|
|||
|
||||
tokens := prepareTokensParameter(ctx, p.BktInfo.Owner)
|
||||
for _, obj := range p.Objects {
|
||||
n.deleteObjectUsingBatchTombstone(ctx, p, obj, tokens, inputCh)
|
||||
n.deleteObjectUsingBatchTombstone2(ctx, p, obj, tokens, inputCh)
|
||||
}
|
||||
|
||||
close(inputCh)
|
||||
|
@ -987,11 +1013,62 @@ func (n *Layer) deleteObjectUsingBatchTombstone(ctx context.Context, p *DeleteOb
|
|||
return
|
||||
}
|
||||
|
||||
inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p, obj)}
|
||||
inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p.BktInfo, p.Settings, p.NetworkInfo, obj)}
|
||||
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||
}
|
||||
|
||||
func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject) *VersionedObject {
|
||||
// deleteObjectUsingBatchTombstone schedule object removing.
|
||||
// Method logic structure is similar to Layer.deleteObject.
|
||||
func (n *Layer) deleteObjectUsingBatchTombstone2(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject, tokens relations.Tokens, inputCh chan<- tombstoneData) {
|
||||
td := tombstoneData{obj: obj}
|
||||
|
||||
n.genericDelete(ctx, p.BktInfo, p.Settings, obj,
|
||||
func(obj *VersionedObject) {
|
||||
inputCh <- td
|
||||
},
|
||||
func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error) {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
return obj.VersionID, nil
|
||||
}
|
||||
|
||||
if nodeVersion.IsCombined {
|
||||
err := n.removeCombinedObject(ctx, p.BktInfo, nodeVersion, p.NetworkInfo)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), p.BktInfo.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||
|
||||
if client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err) {
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
td.members = append(td.members, append(oids, nodeVersion.OID)...)
|
||||
|
||||
return "", nil
|
||||
},
|
||||
func(context.Context, uint64) error { return nil },
|
||||
func(_ context.Context, obj *VersionedObject) *VersionedObject {
|
||||
td.needDeleteMarker = true
|
||||
obj.Error = nil
|
||||
return obj
|
||||
})
|
||||
}
|
||||
|
||||
func (n *Layer) createDeleteMarker(ctx context.Context, bktInfo *data.BucketInfo, bktSettings *data.BucketSettings, networkInfo netmap.NetworkInfo, obj *VersionedObject) *VersionedObject {
|
||||
randOID, err := getRandomOID()
|
||||
if err != nil {
|
||||
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
||||
|
@ -1007,12 +1084,12 @@ func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, o
|
|||
Created: &now,
|
||||
Owner: &n.gateOwner,
|
||||
IsDeleteMarker: true,
|
||||
CreationEpoch: p.NetworkInfo.CurrentEpoch(),
|
||||
CreationEpoch: networkInfo.CurrentEpoch(),
|
||||
},
|
||||
IsUnversioned: p.Settings.VersioningSuspended(),
|
||||
IsUnversioned: bktSettings.VersioningSuspended(),
|
||||
}
|
||||
|
||||
_, obj.Error = n.treeService.AddVersion(ctx, p.BktInfo, newVersion)
|
||||
_, obj.Error = n.treeService.AddVersion(ctx, bktInfo, newVersion)
|
||||
return obj
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue