Fix Delete object (support/v0.37) #862

Merged
fyrchik merged 1 commits from dstepanov-yadro/frostfs-node:fix/delete_object_support_v0.37 into support/v0.37 2023-12-13 09:38:21 +00:00
2 changed files with 28 additions and 2 deletions

View File

@ -544,4 +544,5 @@ const (
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
)

View File

@ -2,11 +2,13 @@ package shard
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -60,6 +62,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
default:
}
if err := s.validateWritecacheDoesntContainObject(ctx, addr); err != nil {
if skipFailed {
continue
}
return result, err
}
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
if skipFailed {
continue
@ -79,6 +88,21 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
return result, nil
}
func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr oid.Address) error {
if !s.hasWriteCache() {
return nil
}
_, err := s.writeCache.Head(ctx, addr)
if err == nil {
s.log.Warn(logs.ObjectRemovalFailureExistsInWritecache, zap.Stringer("object_address", addr))
return fmt.Errorf("object %s must be flushed from writecache", addr)
}
if client.IsErrObjectNotFound(err) {
return nil
}
return err
}
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
@ -97,12 +121,13 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
delPrm.StorageID = storageID
_, err = s.blobStor.Delete(ctx, delPrm)
if err != nil {
if err != nil && !client.IsErrObjectNotFound(err) {
s.log.Debug(logs.ObjectRemovalFailureBlobStor,
zap.Stringer("object_address", addr),
zap.String("error", err.Error()))
return err
}
return err
return nil
}
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {