From a2ab373a0ac5480b1400a4a0e95d920adc541447 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jan 2024 11:56:24 +0300 Subject: [PATCH] [#895] metabase: Do not delete GC mark for virtual objects Signed-off-by: Dmitrii Stepanov --- .../engine/delete_test.go | 92 +++++++++++++++++++ .../engine/engine_test.go | 4 +- pkg/local_object_storage/metabase/delete.go | 30 ++++-- pkg/local_object_storage/shard/shard.go | 8 ++ 4 files changed, 124 insertions(+), 10 deletions(-) diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index f3e4a097..32d07809 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" @@ -85,6 +86,97 @@ func TestDeleteBigObject(t *testing.T) { } } +func TestDeleteBigObjectWithoutGC(t *testing.T) { + t.Parallel() + + cnr := cidtest.ID() + parentID := oidtest.ID() + splitID := objectSDK.NewSplitID() + + parent := testutil.GenerateObjectWithCID(cnr) + parent.SetID(parentID) + parent.SetPayload(nil) + + const childCount = 3 + children := make([]*objectSDK.Object, childCount) + childIDs := make([]oid.ID, childCount) + for i := range children { + children[i] = testutil.GenerateObjectWithCID(cnr) + if i != 0 { + children[i].SetPreviousID(childIDs[i-1]) + } + if i == len(children)-1 { + children[i].SetParent(parent) + } + children[i].SetSplitID(splitID) + children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)}) + childIDs[i], _ = children[i].ID() + } + + link := testutil.GenerateObjectWithCID(cnr) + link.SetParent(parent) + link.SetParentID(parentID) + link.SetSplitID(splitID) + link.SetChildren(childIDs...) + + s1 := testNewShard(t, 1, shard.WithDisabledGC()) + + e := testNewEngine(t).setInitializedShards(t, s1).engine + e.log = test.NewLogger(t) + defer e.Close(context.Background()) + + for i := range children { + require.NoError(t, Put(context.Background(), e, children[i])) + } + require.NoError(t, Put(context.Background(), e, link)) + + addrParent := object.AddressOf(parent) + checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true) + + addrLink := object.AddressOf(link) + checkGetError[error](t, e, addrLink, false) + + for i := range children { + checkGetError[error](t, e, object.AddressOf(children[i]), false) + } + + // delete logical + var deletePrm DeletePrm + deletePrm.WithForceRemoval() + deletePrm.WithAddress(addrParent) + + _, err := e.Delete(context.Background(), deletePrm) + require.NoError(t, err) + + checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) + checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) + for i := range children { + checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true) + } + + // delete physical + var delPrm shard.DeletePrm + delPrm.SetAddresses(addrParent) + _, err = s1.Delete(context.Background(), delPrm) + require.NoError(t, err) + + delPrm.SetAddresses(addrLink) + _, err = s1.Delete(context.Background(), delPrm) + require.NoError(t, err) + + for i := range children { + delPrm.SetAddresses(object.AddressOf(children[i])) + _, err = s1.Delete(context.Background(), delPrm) + require.NoError(t, err) + } + + checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true) + checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true) + for i := range children { + checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true) + } +} + func checkGetError[E error](t *testing.T, e *StorageEngine, addr oid.Address, shouldFail bool) { var getPrm GetPrm getPrm.WithAddress(addr) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index eb15a990..a62734c0 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -181,12 +181,12 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes }, smallFileStorage, largeFileStorage } -func testNewShard(t testing.TB, id int) *shard.Shard { +func testNewShard(t testing.TB, id int, opts ...shard.Option) *shard.Shard { sid, err := generateShardID() require.NoError(t, err) shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...) - s := shard.New(shardOpts...) + s := shard.New(append(shardOpts, opts...)...) require.NoError(t, s.Open(context.Background())) require.NoError(t, s.Init(context.Background())) diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index b2210b55..8ca0dea6 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -251,26 +251,27 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0 - // remove record from the garbage bucket - if garbageBKT != nil { - err := garbageBKT.Delete(addrKey) - if err != nil { - return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) - } - } - // unmarshal object, work only with physically stored (raw == true) objects obj, err := db.get(tx, addr, key, false, true, currEpoch) if err != nil { var siErr *objectSDK.SplitInfoError if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) { + // if object is virtual (parent) then do nothing, it will be deleted with last child return deleteSingleResult{}, nil } return deleteSingleResult{}, err } + // remove record from the garbage bucket + if garbageBKT != nil { + err := garbageBKT.Delete(addrKey) + if err != nil { + return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err) + } + } + // if object is an only link to a parent, then remove parent if parent := obj.Parent(); parent != nil { parAddr := object.AddressOf(parent) @@ -327,6 +328,19 @@ func (db *DB) deleteObject( return fmt.Errorf("can't remove fake bucket tree indexes: %w", err) } + if isParent { + // remove record from the garbage bucket, because regular object deletion does nothing for virtual object + garbageBKT := tx.Bucket(garbageBucketName) + if garbageBKT != nil { + key := make([]byte, addressKeySize) + addrKey := addressKey(object.AddressOf(obj), key) + err := garbageBKT.Delete(addrKey) + if err != nil { + return fmt.Errorf("could not remove from garbage bucket: %w", err) + } + } + } + return nil } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 13f7f689..136f5463 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -371,6 +371,14 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { } } +// WithDisabledGC disables GC. +// For testing purposes only. +func WithDisabledGC() Option { + return func(c *cfg) { + c.gcCfg.testHookRemover = func(ctx context.Context) gcRunResult { return gcRunResult{} } + } +} + // WithZeroSizeCallback returns option to set zero-size containers callback. func WithZeroSizeCallback(cb EmptyContainersCallback) Option { return func(c *cfg) {