From 788836fc9adc48447879fbf83c7e1754575ca70e Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 27 Feb 2025 09:24:45 +0300 Subject: [PATCH] [#601] Reuse common parts for object removing Signed-off-by: Denis Kirillov --- api/layer/delete.go | 106 ++++++++++++++++++++++++++++++++++++++++++++ api/layer/layer.go | 93 ++++++++++++++++++++++++++++++++++---- 2 files changed, 191 insertions(+), 8 deletions(-) create mode 100644 api/layer/delete.go diff --git a/api/layer/delete.go b/api/layer/delete.go new file mode 100644 index 00000000..bcd86afd --- /dev/null +++ b/api/layer/delete.go @@ -0,0 +1,106 @@ +package layer + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "go.uber.org/zap" +) + +type ( + returnCb func(obj *VersionedObject) + rmOldVersionObjFn func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error) + rmVersion func(ctx context.Context, nodeID uint64) error + createDeleteMarker func(ctx context.Context, obj *VersionedObject) *VersionedObject +) + +func (n *Layer) genericDelete(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject, + cb returnCb, rmOld rmOldVersionObjFn, rmVers rmVersion, createMarker createDeleteMarker) { + if len(obj.VersionID) != 0 || settings.Unversioned() { + var nodeVersions []*data.NodeVersion + if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil { + cb(n.handleNotFoundError(bkt, obj)) + return + } + + for _, nodeVersion := range nodeVersions { + obj.DeleteMarkVersion, obj.Error = rmOld(ctx, nodeVersion) + if obj.Error != nil { + if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { + cb(obj) + return + } + n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting, zap.Stringer("cid", bkt.CID), + zap.String("oid", obj.VersionID), zap.Error(obj.Error), logs.TagField(logs.TagExternalStorage)) + } + + if obj.Error = rmVers(ctx, nodeVersion.ID); obj.Error != nil { + cb(obj) + return + } + } + + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) + cb(obj) + return + } + + lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj) + if err != nil { + obj.Error = err + cb(n.handleNotFoundError(bkt, obj)) + return + } + + if settings.VersioningSuspended() { + obj.VersionID = data.UnversionedObjectVersionID + + var nodeVersions []*data.NodeVersion + if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil { + if !isNotFoundError(obj.Error) { + cb(obj) + return + } + } + + for i, nodeVersion := range nodeVersions { + if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker { + nodeVersions = append(nodeVersions[:i], nodeVersions[i+1:]...) + break + } + } + + for _, nodeVersion := range nodeVersions { + obj.DeleteMarkVersion, obj.Error = rmOld(ctx, nodeVersion) + if obj.Error != nil { + if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { + cb(obj) + return + } + n.reqLogger(ctx).Debug(logs.CouldntDeleteObjectFromStorageContinueDeleting, zap.Stringer("cid", bkt.CID), + zap.String("oid", obj.VersionID), zap.Error(obj.Error), logs.TagField(logs.TagExternalStorage)) + } + + if obj.Error = rmVers(ctx, nodeVersion.ID); obj.Error != nil { + cb(obj) + return + } + } + } + + if lastVersion.IsDeleteMarker { + obj.DeleteMarkVersion = lastVersion.OID.EncodeToString() + cb(obj) + return + } + + if obj = createMarker(ctx, obj); obj.Error != nil { + cb(obj) + return + } + + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) + cb(obj) +} diff --git a/api/layer/layer.go b/api/layer/layer.go index e9e5f21b..2f341f60 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -705,6 +705,32 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings return obj } +func (n *Layer) deleteObject2(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject, + networkInfo netmap.NetworkInfo) *VersionedObject { + + n.genericDelete(ctx, bkt, settings, obj, + func(*VersionedObject) {}, + func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error) { + if nodeVersion.IsDeleteMarker { + return obj.VersionID, nil + } + + if nodeVersion.IsCombined { + return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo) + } + + return "", n.objectDelete(ctx, bkt, nodeVersion.OID) + }, + func(ctx context.Context, nodeID uint64) error { + return n.treeService.RemoveVersion(ctx, bkt, nodeID) + }, + func(ctx context.Context, obj *VersionedObject) *VersionedObject { + return n.createDeleteMarker(ctx, bkt, settings, networkInfo, obj) + }) + + return obj +} + func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { if isNotFoundError(obj.Error) { obj.Error = nil @@ -848,7 +874,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver if !p.IsMultiple { for i, obj := range p.Objects { - p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo) + p.Objects[i] = n.deleteObject2(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo) } return p.Objects } @@ -900,7 +926,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver } if obj.needDeleteMarker && obj.obj.Error == nil { - obj.obj = n.createDeleteMarker(ctx, p, obj.obj) + obj.obj = n.createDeleteMarker(ctx, p.BktInfo, p.Settings, p.NetworkInfo, obj.obj) } resObjects = append(resObjects, obj.obj) @@ -911,7 +937,7 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver tokens := prepareTokensParameter(ctx, p.BktInfo.Owner) for _, obj := range p.Objects { - n.deleteObjectUsingBatchTombstone(ctx, p, obj, tokens, inputCh) + n.deleteObjectUsingBatchTombstone2(ctx, p, obj, tokens, inputCh) } close(inputCh) @@ -987,11 +1013,62 @@ func (n *Layer) deleteObjectUsingBatchTombstone(ctx context.Context, p *DeleteOb return } - inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p, obj)} + inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p.BktInfo, p.Settings, p.NetworkInfo, obj)} n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name) } -func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject) *VersionedObject { +// deleteObjectUsingBatchTombstone schedule object removing. +// Method logic structure is similar to Layer.deleteObject. +func (n *Layer) deleteObjectUsingBatchTombstone2(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject, tokens relations.Tokens, inputCh chan<- tombstoneData) { + td := tombstoneData{obj: obj} + + n.genericDelete(ctx, p.BktInfo, p.Settings, obj, + func(obj *VersionedObject) { + inputCh <- td + }, + func(ctx context.Context, nodeVersion *data.NodeVersion) (string, error) { + if nodeVersion.IsDeleteMarker { + td.nodes = append(td.nodes, nodeVersion) + return obj.VersionID, nil + } + + if nodeVersion.IsCombined { + err := n.removeCombinedObject(ctx, p.BktInfo, nodeVersion, p.NetworkInfo) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { + return "", err + } + + td.nodes = append(td.nodes, nodeVersion) + return "", nil + } + + oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), p.BktInfo.CID, nodeVersion.OID, tokens) + if err != nil { + n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", p.BktInfo.CID.EncodeToString()), + zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err)) + + if client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err) { + td.nodes = append(td.nodes, nodeVersion) + return "", nil + } + + return "", err + } + + td.nodes = append(td.nodes, nodeVersion) + td.members = append(td.members, append(oids, nodeVersion.OID)...) + + return "", nil + }, + func(context.Context, uint64) error { return nil }, + func(_ context.Context, obj *VersionedObject) *VersionedObject { + td.needDeleteMarker = true + obj.Error = nil + return obj + }) +} + +func (n *Layer) createDeleteMarker(ctx context.Context, bktInfo *data.BucketInfo, bktSettings *data.BucketSettings, networkInfo netmap.NetworkInfo, obj *VersionedObject) *VersionedObject { randOID, err := getRandomOID() if err != nil { obj.Error = fmt.Errorf("couldn't get random oid: %w", err) @@ -1007,12 +1084,12 @@ func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, o Created: &now, Owner: &n.gateOwner, IsDeleteMarker: true, - CreationEpoch: p.NetworkInfo.CurrentEpoch(), + CreationEpoch: networkInfo.CurrentEpoch(), }, - IsUnversioned: p.Settings.VersioningSuspended(), + IsUnversioned: bktSettings.VersioningSuspended(), } - _, obj.Error = n.treeService.AddVersion(ctx, p.BktInfo, newVersion) + _, obj.Error = n.treeService.AddVersion(ctx, bktInfo, newVersion) return obj }