diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 21b98fca1..ae10564a8 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -314,6 +314,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err) } + if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil { + return deleteSingleResult{}, err + } + return deleteSingleResult{ Phy: true, Logic: removeAvailableObject, @@ -476,3 +480,74 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error return nil } + +func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error { + ech := obj.ECHeader() + if ech == nil { + return nil + } + + hasAnyChunks := hasAnyECChunks(tx, ech, cnr) + // drop EC parent GC mark if current EC chunk is the last one + if !hasAnyChunks && garbageBKT != nil { + var ecParentAddress oid.Address + ecParentAddress.SetContainer(cnr) + ecParentAddress.SetObject(ech.Parent()) + addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize)) + err := garbageBKT.Delete(addrKey) + if err != nil { + return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err) + } + } + + // also drop EC parent root info if current EC chunk is the last one + if !hasAnyChunks { + delUniqueIndexItem(tx, namedBucketItem{ + name: rootBucketName(cnr, make([]byte, bucketKeySize)), + key: objectKey(ech.Parent(), make([]byte, objectKeySize)), + }) + } + + if ech.ParentSplitParentID() == nil { + return nil + } + + var splitParentAddress oid.Address + splitParentAddress.SetContainer(cnr) + splitParentAddress.SetObject(*ech.ParentSplitParentID()) + + if ref, ok := refCounter[string(addressKey(splitParentAddress, make([]byte, addressKeySize)))]; ok { + // linking object is already processing + // so just inform that one more reference was deleted + // split info and gc marks will be deleted after linking object delete + ref.cur++ + return nil + } + + if parentLength(tx, splitParentAddress) > 0 { + // linking object still exists, so leave split info and gc mark deletion for linking object processing + return nil + } + + // drop split parent gc mark + if garbageBKT != nil { + addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize)) + err := garbageBKT.Delete(addrKey) + if err != nil { + return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err) + } + } + + // drop split info + delUniqueIndexItem(tx, namedBucketItem{ + name: rootBucketName(cnr, make([]byte, bucketKeySize)), + key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)), + }) + return nil +} + +func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool { + data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)), + objectKey(ech.Parent(), make([]byte, objectKeySize))) + return len(data) > 0 +} diff --git a/pkg/local_object_storage/metabase/delete_ec_test.go b/pkg/local_object_storage/metabase/delete_ec_test.go new file mode 100644 index 000000000..0e627f095 --- /dev/null +++ b/pkg/local_object_storage/metabase/delete_ec_test.go @@ -0,0 +1,459 @@ +package meta + +import ( + "bytes" + "context" + "fmt" + "path/filepath" + "slices" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +func TestDeleteECObject_WithoutSplit(t *testing.T) { + t.Parallel() + + db := New( + WithPath(filepath.Join(t.TempDir(), "metabase")), + WithPermissions(0o600), + WithEpochState(epochState{uint64(12)}), + ) + + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.NoError(t, db.Init()) + defer func() { require.NoError(t, db.Close()) }() + + cnr := cidtest.ID() + ecChunk := oidtest.ID() + ecParent := oidtest.ID() + tombstoneID := oidtest.ID() + + chunkObj := testutil.GenerateObjectWithCID(cnr) + chunkObj.SetContainerID(cnr) + chunkObj.SetID(ecChunk) + chunkObj.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + chunkObj.SetPayloadSize(uint64(10)) + chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent}, 0, 3, []byte{}, 0)) + + // put object with EC + + var prm PutPrm + prm.SetObject(chunkObj) + prm.SetStorageID([]byte("0/0")) + _, err := db.Put(context.Background(), prm) + require.NoError(t, err) + + var ecChunkAddress oid.Address + ecChunkAddress.SetContainer(cnr) + ecChunkAddress.SetObject(ecChunk) + + var ecParentAddress oid.Address + ecParentAddress.SetContainer(cnr) + ecParentAddress.SetObject(ecParent) + + var getPrm GetPrm + + getPrm.SetAddress(ecChunkAddress) + _, err = db.Get(context.Background(), getPrm) + require.NoError(t, err) + + var ecInfoError *objectSDK.ECInfoError + getPrm.SetAddress(ecParentAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, &ecInfoError) + require.True(t, len(ecInfoError.ECInfo().Chunks) == 1 && + ecInfoError.ECInfo().Chunks[0].Index == 0 && + ecInfoError.ECInfo().Chunks[0].Total == 3) + + // inhume EC parent (like Delete does) + + var inhumePrm InhumePrm + var tombAddress oid.Address + tombAddress.SetContainer(cnr) + tombAddress.SetObject(tombstoneID) + inhumePrm.SetAddresses(ecParentAddress) + inhumePrm.SetTombstoneAddress(tombAddress) + _, err = db.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + getPrm.SetAddress(ecParentAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + + getPrm.SetAddress(ecChunkAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + + // GC finds and deletes split, EC parent and EC chunk + + var garbageAddresses []oid.Address + var itPrm GarbageIterationPrm + itPrm.SetHandler(func(g GarbageObject) error { + garbageAddresses = append(garbageAddresses, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, 2, len(garbageAddresses)) + require.True(t, slices.Contains(garbageAddresses, ecParentAddress)) + require.True(t, slices.Contains(garbageAddresses, ecChunkAddress)) + + var deletePrm DeletePrm + deletePrm.SetAddresses(garbageAddresses...) + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + + garbageAddresses = nil + itPrm.SetHandler(func(g GarbageObject) error { + garbageAddresses = append(garbageAddresses, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, 0, len(garbageAddresses)) + + // after tombstone expired GC inhumes tombstone and drops graves + + var tombstonedObjects []TombstonedObject + var graveyardIterationPrm GraveyardIterationPrm + graveyardIterationPrm.SetHandler(func(object TombstonedObject) error { + tombstonedObjects = append(tombstonedObjects, object) + return nil + }) + require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) + require.Equal(t, 2, len(tombstonedObjects)) + + var tombstones []oid.Address + for _, tss := range tombstonedObjects { + tombstones = append(tombstones, tss.tomb) + } + inhumePrm.SetAddresses(tombstones...) + inhumePrm.SetGCMark() + _, err = db.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects)) + + // GC finds tombstone as garbage and deletes it + + garbageAddresses = nil + itPrm.SetHandler(func(g GarbageObject) error { + garbageAddresses = append(garbageAddresses, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, 1, len(garbageAddresses)) + require.Equal(t, tombstoneID, garbageAddresses[0].Object()) + + deletePrm.SetAddresses(garbageAddresses...) + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + + // no more objects should left as garbage + + itPrm.SetHandler(func(g GarbageObject) error { + require.FailNow(t, "no garbage objects should left") + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + + require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft)) + + require.NoError(t, testCountersAreZero(db, cnr)) +} + +func TestDeleteECObject_WithSplit(t *testing.T) { + t.Parallel() + for _, c := range []int{1, 2, 3} { + for _, l := range []bool{true, false} { + test := fmt.Sprintf("%d EC chunks with split info without linking object", c) + if l { + test = fmt.Sprintf("%d EC chunks with split info with linking object", c) + } + t.Run(test, func(t *testing.T) { + testDeleteECObjectWithSplit(t, c, l) + }) + } + } +} + +func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) { + t.Parallel() + + db := New( + WithPath(filepath.Join(t.TempDir(), "metabase")), + WithPermissions(0o600), + WithEpochState(epochState{uint64(12)}), + ) + + require.NoError(t, db.Open(context.Background(), mode.ReadWrite)) + require.NoError(t, db.Init()) + defer func() { require.NoError(t, db.Close()) }() + + cnr := cidtest.ID() + ecChunks := make([]oid.ID, chunksCount) + for idx := range ecChunks { + ecChunks[idx] = oidtest.ID() + } + ecParentID := oidtest.ID() + splitParentID := oidtest.ID() + tombstoneID := oidtest.ID() + splitID := objectSDK.NewSplitID() + linkingID := oidtest.ID() + + ecChunkObjects := make([]*objectSDK.Object, chunksCount) + for idx := range ecChunkObjects { + ecChunkObjects[idx] = testutil.GenerateObjectWithCID(cnr) + ecChunkObjects[idx].SetContainerID(cnr) + ecChunkObjects[idx].SetID(ecChunks[idx]) + ecChunkObjects[idx].SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + ecChunkObjects[idx].SetPayloadSize(uint64(10)) + ecChunkObjects[idx].SetECHeader(objectSDK.NewECHeader( + objectSDK.ECParentInfo{ + ID: ecParentID, + SplitParentID: &splitParentID, SplitID: splitID, + }, uint32(idx), uint32(chunksCount+1), []byte{}, 0)) + } + + splitParentObj := testutil.GenerateObjectWithCID(cnr) + splitParentObj.SetID(splitParentID) + + var linkingAddress oid.Address + linkingAddress.SetContainer(cnr) + linkingAddress.SetObject(linkingID) + + linkingObj := testutil.GenerateObjectWithCID(cnr) + linkingObj.SetID(linkingID) + linkingObj.SetParent(splitParentObj) + linkingObj.SetParentID(splitParentID) + linkingObj.SetChildren(ecParentID, oidtest.ID(), oidtest.ID()) + linkingObj.SetSplitID(splitID) + + // put object with EC and split info + + var prm PutPrm + prm.SetStorageID([]byte("0/0")) + for _, obj := range ecChunkObjects { + prm.SetObject(obj) + _, err := db.Put(context.Background(), prm) + require.NoError(t, err) + } + + if withLinking { + prm.SetObject(linkingObj) + _, err := db.Put(context.Background(), prm) + require.NoError(t, err) + } + + var ecParentAddress oid.Address + ecParentAddress.SetContainer(cnr) + ecParentAddress.SetObject(ecParentID) + + var getPrm GetPrm + var ecInfoError *objectSDK.ECInfoError + getPrm.SetAddress(ecParentAddress) + _, err := db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, &ecInfoError) + require.True(t, len(ecInfoError.ECInfo().Chunks) == chunksCount) + + var splitParentAddress oid.Address + splitParentAddress.SetContainer(cnr) + splitParentAddress.SetObject(splitParentID) + + var splitInfoError *objectSDK.SplitInfoError + getPrm.SetAddress(splitParentAddress) + getPrm.SetRaw(true) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, &splitInfoError) + require.True(t, splitInfoError.SplitInfo() != nil) + require.Equal(t, splitID, splitInfoError.SplitInfo().SplitID()) + lastPart, set := splitInfoError.SplitInfo().LastPart() + require.True(t, set) + require.Equal(t, lastPart, ecParentID) + if withLinking { + l, ok := splitInfoError.SplitInfo().Link() + require.True(t, ok) + require.Equal(t, linkingID, l) + } + getPrm.SetRaw(false) + + // inhume EC parent and split objects (like Delete does) + + inhumeAddresses := []oid.Address{splitParentAddress, ecParentAddress} + if withLinking { + inhumeAddresses = append(inhumeAddresses, linkingAddress) + } + + var inhumePrm InhumePrm + var tombAddress oid.Address + tombAddress.SetContainer(cnr) + tombAddress.SetObject(tombstoneID) + inhumePrm.SetAddresses(inhumeAddresses...) + inhumePrm.SetTombstoneAddress(tombAddress) + _, err = db.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + getPrm.SetAddress(ecParentAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + + getPrm.SetAddress(splitParentAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + + if withLinking { + getPrm.SetAddress(linkingAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + } + + for _, id := range ecChunks { + var ecChunkAddress oid.Address + ecChunkAddress.SetContainer(cnr) + ecChunkAddress.SetObject(id) + getPrm.SetAddress(ecChunkAddress) + _, err = db.Get(context.Background(), getPrm) + require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved)) + } + + // GC finds and deletes split, EC parent and EC chunks + + parentCount := 2 // split + ec + if withLinking { + parentCount = 3 + } + + var garbageAddresses []oid.Address + var itPrm GarbageIterationPrm + itPrm.SetHandler(func(g GarbageObject) error { + garbageAddresses = append(garbageAddresses, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, parentCount+chunksCount, len(garbageAddresses)) + require.True(t, slices.Contains(garbageAddresses, splitParentAddress)) + require.True(t, slices.Contains(garbageAddresses, ecParentAddress)) + if withLinking { + require.True(t, slices.Contains(garbageAddresses, linkingAddress)) + } + for _, id := range ecChunks { + var ecChunkAddress oid.Address + ecChunkAddress.SetContainer(cnr) + ecChunkAddress.SetObject(id) + require.True(t, slices.Contains(garbageAddresses, ecChunkAddress)) + } + + var deletePrm DeletePrm + deletePrm.SetAddresses(garbageAddresses...) + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + + var garbageStub []oid.Address + itPrm.SetHandler(func(g GarbageObject) error { + garbageStub = append(garbageStub, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, 0, len(garbageStub)) + + // after tombstone expired GC inhumes tombstone and drops graves + + var tombstonedObjects []TombstonedObject + var graveyardIterationPrm GraveyardIterationPrm + graveyardIterationPrm.SetHandler(func(object TombstonedObject) error { + tombstonedObjects = append(tombstonedObjects, object) + return nil + }) + require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) + require.True(t, len(tombstonedObjects) == parentCount+chunksCount) + + var tombstones []oid.Address + for _, tss := range tombstonedObjects { + tombstones = append(tombstones, tss.tomb) + } + inhumePrm.SetAddresses(tombstones...) + inhumePrm.SetGCMark() + _, err = db.Inhume(context.Background(), inhumePrm) + require.NoError(t, err) + + require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects)) + + // GC finds tombstone as garbage and deletes it + + garbageAddresses = nil + itPrm.SetHandler(func(g GarbageObject) error { + garbageAddresses = append(garbageAddresses, g.Address()) + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + require.Equal(t, 1, len(garbageAddresses)) + require.Equal(t, tombstoneID, garbageAddresses[0].Object()) + + deletePrm.SetAddresses(garbageAddresses...) + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + + // no more objects should left as garbage + + itPrm.SetHandler(func(g GarbageObject) error { + require.FailNow(t, "no garbage objects should left") + return nil + }) + require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) + + require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft)) + + require.NoError(t, testCountersAreZero(db, cnr)) +} + +func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error { + return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + if bytes.Equal(name, shardInfoBucket) || + bytes.Equal(name, containerCounterBucketName) || + bytes.Equal(name, containerVolumeBucketName) { + return nil + } + return testBucketEmpty(name, b) + }) +} + +func testBucketEmpty(name []byte, b *bbolt.Bucket) error { + err := b.ForEach(func(k, v []byte) error { + if len(v) > 0 { + return fmt.Errorf("bucket %v is not empty", name) + } + return nil + }) + if err != nil { + return err + } + return b.ForEachBucket(func(k []byte) error { + return testBucketEmpty(k, b.Bucket(k)) + }) +} + +func testCountersAreZero(db *DB, cnr cid.ID) error { + c, err := db.ContainerCount(context.Background(), cnr) + if err != nil { + return err + } + if !c.IsZero() { + return fmt.Errorf("container %s has non zero counters", cnr.EncodeToString()) + } + s, err := db.ContainerSize(cnr) + if err != nil { + return err + } + if s != 0 { + return fmt.Errorf("container %s has non zero size", cnr.EncodeToString()) + } + return nil +}