[#653] Support removal old unversioned objects

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2025-03-05 17:44:23 +03:00 committed by Alexey Vanin
parent c0c99a1839
commit 4a430257a4
18 changed files with 289 additions and 61 deletions

View file

@ -48,6 +48,9 @@ type (
FormContainerZone(ns string) string
TombstoneMembersSize() int
TombstoneLifetime() uint64
RemoveOnReplace() bool
RemoveOnReplaceTimeout() time.Duration
RemoveOnReplaceQueue() int
}
Layer struct {
@ -63,6 +66,15 @@ type (
corsCnrInfo *data.BucketInfo
lifecycleCnrInfo *data.BucketInfo
workerPool *ants.Pool
removalChan chan removalParams
}
removalParams struct {
Auth frostfs.PrmAuth
BktInfo *data.BucketInfo
OIDs []oid.ID
RequestID string
TraceID string
}
Config struct {
@ -256,8 +268,8 @@ func (p HeadObjectParams) Versioned() bool {
// NewLayer creates an instance of a Layer. It checks credentials
// and establishes gRPC connection with the node.
func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
return &Layer{
func NewLayer(ctx context.Context, log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
layer := &Layer{
frostFS: frostFS,
log: log,
gateOwner: config.GateOwner,
@ -270,7 +282,13 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
corsCnrInfo: config.CORSCnrInfo,
lifecycleCnrInfo: config.LifecycleCnrInfo,
workerPool: config.WorkerPool,
// TODO: consider closing channel
removalChan: make(chan removalParams, config.Features.RemoveOnReplaceQueue()),
}
go layer.removalRoutine(ctx)
return layer
}
func (n *Layer) EphemeralKey() *keys.PublicKey {
@ -695,7 +713,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
IsUnversioned: settings.VersioningSuspended(),
}
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
if _, obj.Error = n.addVersion(ctx, bkt, newVersion); obj.Error != nil {
return obj
}
@ -704,6 +722,67 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
func (n *Layer) removalRoutine(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case prm, ok := <-n.removalChan:
if !ok {
return
}
reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout())
for _, objID := range prm.OIDs {
if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil {
n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID),
zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage))
}
}
cancel()
}
}
}
func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) {
if !n.features.RemoveOnReplace() {
return
}
reqInfo := middleware.GetReqInfo(ctx)
prm := removalParams{
Auth: frostfs.PrmAuth{},
BktInfo: bktInfo,
OIDs: OIDs,
RequestID: reqInfo.RequestID,
TraceID: reqInfo.TraceID,
}
n.prepareAuthParameters(ctx, &prm.Auth, bktInfo.Owner)
select {
case n.removalChan <- prm:
default:
oidsStr := make([]string, len(OIDs))
for i, d := range OIDs {
oidsStr[i] = d.EncodeToString()
}
n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete,
zap.Strings("oids", oidsStr), logs.TagField(logs.TagDatapath))
}
}
func (n *Layer) addVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
nodeID, OIDs, err := n.treeService.AddVersion(ctx, bktInfo, version)
n.tryRemove(ctx, bktInfo, OIDs)
if err != nil {
return nodeID, err
}
return nodeID, nil
}
func (n *Layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if isNotFoundError(obj.Error) {
obj.Error = nil