forked from TrueCloudLab/frostfs-node
[#734] shard: Fix Delete method
Due to the flushing data from the writecache to the storage and simultaneous deletion, a partial deletion situation is possible. So as a solution, deletion is allowed only when the object is in storage, because object will be deleted from writecache by flush goroutine. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f26233b47a
commit
f2437f7ae9
3 changed files with 39 additions and 34 deletions
|
@ -2,15 +2,12 @@ package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
@ -34,8 +31,7 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
|
||||||
p.addr = append(p.addr, addr...)
|
p.addr = append(p.addr, addr...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete removes data from the shard's writeCache, metaBase and
|
// Delete removes data from the shard's metaBase and// blobStor.
|
||||||
// blobStor.
|
|
||||||
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Delete",
|
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Delete",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -47,10 +43,10 @@ func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
return s.delete(ctx, prm)
|
return s.delete(ctx, prm, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (DeleteRes, error) {
|
||||||
if s.info.Mode.ReadOnly() {
|
if s.info.Mode.ReadOnly() {
|
||||||
return DeleteRes{}, ErrReadOnlyMode
|
return DeleteRes{}, ErrReadOnlyMode
|
||||||
} else if s.info.Mode.NoMetabase() {
|
} else if s.info.Mode.NoMetabase() {
|
||||||
|
@ -65,12 +61,18 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
s.deleteObjectFromWriteCacheSafe(ctx, addr)
|
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
|
||||||
|
if skipFailed {
|
||||||
s.deleteFromBlobstorSafe(ctx, addr)
|
continue
|
||||||
|
}
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.deleteFromMetabase(ctx, addr); err != nil {
|
if err := s.deleteFromMetabase(ctx, addr); err != nil {
|
||||||
return result, err // stop on metabase error ?
|
if skipFailed {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return result, err
|
||||||
}
|
}
|
||||||
result.deleted++
|
result.deleted++
|
||||||
}
|
}
|
||||||
|
@ -78,16 +80,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) {
|
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
|
||||||
if s.hasWriteCache() {
|
|
||||||
err := s.writeCache.Delete(ctx, addr)
|
|
||||||
if err != nil && !client.IsErrObjectNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
|
|
||||||
s.log.Warn(logs.ShardCantDeleteObjectFromWriteCache, zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
|
|
||||||
var sPrm meta.StorageIDPrm
|
var sPrm meta.StorageIDPrm
|
||||||
sPrm.SetAddress(addr)
|
sPrm.SetAddress(addr)
|
||||||
|
|
||||||
|
@ -97,6 +90,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
|
||||||
zap.Stringer("object", addr),
|
zap.Stringer("object", addr),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
storageID := res.StorageID()
|
storageID := res.StorageID()
|
||||||
|
|
||||||
|
@ -111,6 +105,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
|
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
|
@ -52,13 +52,18 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||||
_, err = testGet(t, sh, getPrm, hasWriteCache)
|
_, err = testGet(t, sh, getPrm, hasWriteCache)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = sh.Delete(context.TODO(), delPrm)
|
if hasWriteCache {
|
||||||
require.NoError(t, err)
|
require.Eventually(t, func() bool {
|
||||||
|
_, err = sh.Delete(context.Background(), delPrm)
|
||||||
|
return err == nil
|
||||||
|
}, 30*time.Second, 100*time.Millisecond)
|
||||||
|
} else {
|
||||||
|
_, err = sh.Delete(context.Background(), delPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
require.True(t, client.IsErrObjectNotFound(err))
|
||||||
return client.IsErrObjectNotFound(err)
|
|
||||||
}, time.Second, 50*time.Millisecond)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("small object", func(t *testing.T) {
|
t.Run("small object", func(t *testing.T) {
|
||||||
|
@ -78,12 +83,17 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = sh.Delete(context.Background(), delPrm)
|
if hasWriteCache {
|
||||||
require.NoError(t, err)
|
require.Eventually(t, func() bool {
|
||||||
|
_, err = sh.Delete(context.Background(), delPrm)
|
||||||
|
return err == nil
|
||||||
|
}, 10*time.Second, 100*time.Millisecond)
|
||||||
|
} else {
|
||||||
|
_, err = sh.Delete(context.Background(), delPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
require.True(t, client.IsErrObjectNotFound(err))
|
||||||
return client.IsErrObjectNotFound(err)
|
|
||||||
}, time.Second, 50*time.Millisecond)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -297,7 +297,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
|
||||||
deletePrm.SetAddresses(buf...)
|
deletePrm.SetAddresses(buf...)
|
||||||
|
|
||||||
// delete accumulated objects
|
// delete accumulated objects
|
||||||
res, err := s.delete(ctx, deletePrm)
|
res, err := s.delete(ctx, deletePrm, true)
|
||||||
|
|
||||||
result.deleted = res.deleted
|
result.deleted = res.deleted
|
||||||
result.failedToDelete = uint64(len(buf)) - res.deleted
|
result.failedToDelete = uint64(len(buf)) - res.deleted
|
||||||
|
|
Loading…
Reference in a new issue