[#412] Store creation epoch of delete markers

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-23 18:12:16 +03:00
parent 0644067496
commit fa68a4ce40
2 changed files with 39 additions and 13 deletions

View file

@ -81,10 +81,17 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
networkInfo, err := h.obj.GetNetworkInfo(ctx)
if err != nil {
h.logAndSendError(w, "could not get network info", reqInfo, err)
return
}
p := &layer.DeleteObjectParams{ p := &layer.DeleteObjectParams{
BktInfo: bktInfo, BktInfo: bktInfo,
Objects: versionedObject, Objects: versionedObject,
Settings: bktSettings, Settings: bktSettings,
NetworkInfo: networkInfo,
} }
deletedObjects := h.obj.DeleteObjects(ctx, p) deletedObjects := h.obj.DeleteObjects(ctx, p)
deletedObject := deletedObjects[0] deletedObject := deletedObjects[0]
@ -181,11 +188,18 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
return return
} }
networkInfo, err := h.obj.GetNetworkInfo(ctx)
if err != nil {
h.logAndSendError(w, "could not get network info", reqInfo, err)
return
}
p := &layer.DeleteObjectParams{ p := &layer.DeleteObjectParams{
BktInfo: bktInfo, BktInfo: bktInfo,
Objects: toRemove, Objects: toRemove,
Settings: bktSettings, Settings: bktSettings,
IsMultiple: true, NetworkInfo: networkInfo,
IsMultiple: true,
} }
deletedObjects := h.obj.DeleteObjects(ctx, p) deletedObjects := h.obj.DeleteObjects(ctx, p)

View file

@ -124,10 +124,11 @@ type (
} }
DeleteObjectParams struct { DeleteObjectParams struct {
BktInfo *data.BucketInfo BktInfo *data.BucketInfo
Objects []*VersionedObject Objects []*VersionedObject
Settings *data.BucketSettings Settings *data.BucketSettings
IsMultiple bool NetworkInfo netmap.NetworkInfo
IsMultiple bool
} }
// PutSettingsParams stores object copy request parameters. // PutSettingsParams stores object copy request parameters.
@ -545,7 +546,8 @@ func getRandomOID() (oid.ID, error) {
return objID, nil return objID, nil
} }
func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject { func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject,
networkInfo netmap.NetworkInfo) *VersionedObject {
if len(obj.VersionID) != 0 || settings.Unversioned() { if len(obj.VersionID) != 0 || settings.Unversioned() {
var nodeVersions []*data.NodeVersion var nodeVersions []*data.NodeVersion
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil { if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
@ -627,6 +629,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
Created: &now, Created: &now,
Owner: &n.gateOwner, Owner: &n.gateOwner,
IsDeleteMarker: true, IsDeleteMarker: true,
CreationEpoch: networkInfo.CurrentEpoch(),
}, },
IsUnversioned: settings.VersioningSuspended(), IsUnversioned: settings.VersioningSuspended(),
} }
@ -769,7 +772,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
// DeleteObjects from the storage. // DeleteObjects from the storage.
func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject { func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
for i, obj := range p.Objects { for i, obj := range p.Objects {
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj) p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
if p.IsMultiple && p.Objects[i].Error != nil { if p.IsMultiple && p.Objects[i].Error != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error)) n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error))
} }
@ -849,3 +852,12 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
return nil return nil
} }
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
networkInfo, err := n.frostFS.NetworkInfo(ctx)
if err != nil {
return networkInfo, fmt.Errorf("get network info: %w", err)
}
return networkInfo, nil
}