[#412] Store creation epoch of delete markers #440
2 changed files with 39 additions and 13 deletions
|
@ -81,10 +81,17 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
networkInfo, err := h.obj.GetNetworkInfo(ctx)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get network info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
Objects: versionedObject,
|
||||
Settings: bktSettings,
|
||||
BktInfo: bktInfo,
|
||||
Objects: versionedObject,
|
||||
Settings: bktSettings,
|
||||
NetworkInfo: networkInfo,
|
||||
}
|
||||
deletedObjects := h.obj.DeleteObjects(ctx, p)
|
||||
deletedObject := deletedObjects[0]
|
||||
|
@ -181,11 +188,18 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
return
|
||||
}
|
||||
|
||||
networkInfo, err := h.obj.GetNetworkInfo(ctx)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get network info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
Objects: toRemove,
|
||||
Settings: bktSettings,
|
||||
IsMultiple: true,
|
||||
BktInfo: bktInfo,
|
||||
Objects: toRemove,
|
||||
Settings: bktSettings,
|
||||
NetworkInfo: networkInfo,
|
||||
IsMultiple: true,
|
||||
}
|
||||
deletedObjects := h.obj.DeleteObjects(ctx, p)
|
||||
|
||||
|
|
|
@ -122,10 +122,11 @@ type (
|
|||
}
|
||||
|
||||
DeleteObjectParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
Objects []*VersionedObject
|
||||
Settings *data.BucketSettings
|
||||
IsMultiple bool
|
||||
BktInfo *data.BucketInfo
|
||||
Objects []*VersionedObject
|
||||
Settings *data.BucketSettings
|
||||
NetworkInfo netmap.NetworkInfo
|
||||
IsMultiple bool
|
||||
}
|
||||
|
||||
// PutSettingsParams stores object copy request parameters.
|
||||
|
@ -541,7 +542,8 @@ func getRandomOID() (oid.ID, error) {
|
|||
return objID, nil
|
||||
}
|
||||
|
||||
func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||
func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject,
|
||||
networkInfo netmap.NetworkInfo) *VersionedObject {
|
||||
if len(obj.VersionID) != 0 || settings.Unversioned() {
|
||||
var nodeVersions []*data.NodeVersion
|
||||
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, bkt, obj); obj.Error != nil {
|
||||
|
@ -623,6 +625,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
|||
Created: &now,
|
||||
Owner: &n.gateOwner,
|
||||
IsDeleteMarker: true,
|
||||
CreationEpoch: networkInfo.CurrentEpoch(),
|
||||
},
|
||||
IsUnversioned: settings.VersioningSuspended(),
|
||||
}
|
||||
|
@ -765,7 +768,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
// DeleteObjects from the storage.
|
||||
func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj)
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||
if p.IsMultiple && p.Objects[i].Error != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error))
|
||||
}
|
||||
|
@ -819,3 +822,12 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
|||
n.cache.DeleteBucket(p.BktInfo)
|
||||
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
|
||||
}
|
||||
|
||||
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||
networkInfo, err := n.frostFS.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return networkInfo, fmt.Errorf("get network info: %w", err)
|
||||
}
|
||||
|
||||
return networkInfo, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue