[#661] shard: Fix Delete method

Due to the flushing data from the writecache to the storage
and simultaneous deletion, a partial deletion situation is possible.
So as a solution, deletion is allowed only when the object is in storage,
because object will be deleted from writecache by flush goroutine.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-09-22 15:27:51 +03:00
parent 9ef4a885de
commit 71fac2b9c3
3 changed files with 39 additions and 34 deletions

View file

@ -2,14 +2,11 @@ package shard
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -33,8 +30,7 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
p.addr = append(p.addr, addr...)
}
// Delete removes data from the shard's writeCache, metaBase and
// blobStor.
// Delete removes data from the shard's metaBase and blobStor.
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Delete",
trace.WithAttributes(
@ -46,10 +42,10 @@ func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.m.RLock()
defer s.m.RUnlock()
return s.delete(ctx, prm)
return s.delete(ctx, prm, false)
}
func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (DeleteRes, error) {
if s.info.Mode.ReadOnly() {
return DeleteRes{}, ErrReadOnlyMode
} else if s.info.Mode.NoMetabase() {
@ -64,12 +60,18 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
default:
}
s.deleteObjectFromWriteCacheSafe(ctx, addr)
s.deleteFromBlobstorSafe(ctx, addr)
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
if skipFailed {
continue
}
return result, err
}
if err := s.deleteFromMetabase(ctx, addr); err != nil {
return result, err // stop on metabase error ?
if skipFailed {
continue
}
return result, err
}
result.deleted++
}
@ -77,16 +79,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return result, nil
}
func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) {
if s.hasWriteCache() {
err := s.writeCache.Delete(ctx, addr)
if err != nil && !client.IsErrObjectNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn(logs.ShardCantDeleteObjectFromWriteCache, zap.Error(err))
}
}
}
func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
@ -95,6 +88,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
s.log.Debug(logs.StorageIDRetrievalFailure,
zap.Stringer("object", addr),
zap.String("error", err.Error()))
return err
}
storageID := res.StorageID()
@ -108,6 +102,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
zap.Stringer("object_address", addr),
zap.String("error", err.Error()))
}
return err
}
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {

View file

@ -52,13 +52,18 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Delete(context.TODO(), delPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 30*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
require.True(t, client.IsErrObjectNotFound(err))
})
t.Run("small object", func(t *testing.T) {
@ -78,12 +83,17 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 10*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
require.True(t, client.IsErrObjectNotFound(err))
})
}

View file

@ -297,7 +297,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
deletePrm.SetAddresses(buf...)
// delete accumulated objects
res, err := s.delete(ctx, deletePrm)
res, err := s.delete(ctx, deletePrm, true)
result.deleted = res.deleted
result.failedToDelete = uint64(len(buf)) - res.deleted