Fix Delete object #860
2 changed files with 28 additions and 2 deletions
|
@ -553,4 +553,5 @@ const (
|
||||||
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
|
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
|
||||||
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
|
||||||
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
|
||||||
|
ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache"
|
||||||
)
|
)
|
||||||
|
|
|
@ -2,12 +2,14 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -61,6 +63,13 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.validateWritecacheDoesntContainObject(ctx, addr); err != nil {
|
||||||
|
if skipFailed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
|
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
|
||||||
if skipFailed {
|
if skipFailed {
|
||||||
continue
|
continue
|
||||||
|
@ -80,6 +89,21 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (Del
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Shard) validateWritecacheDoesntContainObject(ctx context.Context, addr oid.Address) error {
|
||||||
|
if !s.hasWriteCache() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := s.writeCache.Head(ctx, addr)
|
||||||
|
if err == nil {
|
||||||
|
s.log.Warn(logs.ObjectRemovalFailureExistsInWritecache, zap.Stringer("object_address", addr))
|
||||||
|
return fmt.Errorf("object %s must be flushed from writecache", addr)
|
||||||
|
}
|
||||||
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
|
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
|
||||||
var sPrm meta.StorageIDPrm
|
var sPrm meta.StorageIDPrm
|
||||||
sPrm.SetAddress(addr)
|
sPrm.SetAddress(addr)
|
||||||
|
@ -99,13 +123,14 @@ func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error
|
||||||
delPrm.StorageID = storageID
|
delPrm.StorageID = storageID
|
||||||
|
|
||||||
_, err = s.blobStor.Delete(ctx, delPrm)
|
_, err = s.blobStor.Delete(ctx, delPrm)
|
||||||
if err != nil {
|
if err != nil && !client.IsErrObjectNotFound(err) {
|
||||||
s.log.Debug(logs.ObjectRemovalFailureBlobStor,
|
s.log.Debug(logs.ObjectRemovalFailureBlobStor,
|
||||||
zap.Stringer("object_address", addr),
|
zap.Stringer("object_address", addr),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
|
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
Loading…
Reference in a new issue