metabase: Delete EC gc marks and split info #1257

Merged
dstepanov-yadro merged 1 commit from dstepanov-yadro/frostfs-node:fix/ec_delete into master 2024-07-22 11:17:31 +00:00
2 changed files with 534 additions and 0 deletions

View file

@ -314,6 +314,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
return deleteSingleResult{}, fmt.Errorf("could not remove object: %w", err)
}
if err := deleteECRelatedInfo(tx, garbageBKT, obj, addr.Container(), refCounter); err != nil {
return deleteSingleResult{}, err
}
return deleteSingleResult{
Phy: true,
Logic: removeAvailableObject,
@ -476,3 +480,74 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
return nil
}
func deleteECRelatedInfo(tx *bbolt.Tx, garbageBKT *bbolt.Bucket, obj *objectSDK.Object, cnr cid.ID, refCounter referenceCounter) error {
ech := obj.ECHeader()
if ech == nil {
return nil
}
hasAnyChunks := hasAnyECChunks(tx, ech, cnr)
// drop EC parent GC mark if current EC chunk is the last one
if !hasAnyChunks && garbageBKT != nil {
var ecParentAddress oid.Address
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ech.Parent())
addrKey := addressKey(ecParentAddress, make([]byte, addressKeySize))
err := garbageBKT.Delete(addrKey)
if err != nil {
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
}
}
// also drop EC parent root info if current EC chunk is the last one
if !hasAnyChunks {
delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
})
}
if ech.ParentSplitParentID() == nil {
return nil
}
var splitParentAddress oid.Address
splitParentAddress.SetContainer(cnr)
splitParentAddress.SetObject(*ech.ParentSplitParentID())
if ref, ok := refCounter[string(addressKey(splitParentAddress, make([]byte, addressKeySize)))]; ok {
// linking object is already processing
// so just inform that one more reference was deleted
// split info and gc marks will be deleted after linking object delete
ref.cur++
return nil
}
if parentLength(tx, splitParentAddress) > 0 {
// linking object still exists, so leave split info and gc mark deletion for linking object processing
return nil
}
// drop split parent gc mark
if garbageBKT != nil {
addrKey := addressKey(splitParentAddress, make([]byte, addressKeySize))
err := garbageBKT.Delete(addrKey)
if err != nil {
return fmt.Errorf("could not remove EC parent from garbage bucket: %w", err)
}
}
// drop split info
delUniqueIndexItem(tx, namedBucketItem{
name: rootBucketName(cnr, make([]byte, bucketKeySize)),
key: objectKey(*ech.ParentSplitParentID(), make([]byte, objectKeySize)),
})
return nil
}
func hasAnyECChunks(tx *bbolt.Tx, ech *objectSDK.ECHeader, cnr cid.ID) bool {
data := getFromBucket(tx, ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
objectKey(ech.Parent(), make([]byte, objectKeySize)))
return len(data) > 0
}

View file

@ -0,0 +1,459 @@
package meta
import (
"bytes"
"context"
"fmt"
"path/filepath"
"slices"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
func TestDeleteECObject_WithoutSplit(t *testing.T) {
t.Parallel()
db := New(
WithPath(filepath.Join(t.TempDir(), "metabase")),
WithPermissions(0o600),
WithEpochState(epochState{uint64(12)}),
)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
ecChunk := oidtest.ID()
ecParent := oidtest.ID()
tombstoneID := oidtest.ID()
chunkObj := testutil.GenerateObjectWithCID(cnr)
chunkObj.SetContainerID(cnr)
chunkObj.SetID(ecChunk)
chunkObj.SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
chunkObj.SetPayloadSize(uint64(10))
chunkObj.SetECHeader(objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: ecParent}, 0, 3, []byte{}, 0))
// put object with EC
var prm PutPrm
prm.SetObject(chunkObj)
prm.SetStorageID([]byte("0/0"))
_, err := db.Put(context.Background(), prm)
require.NoError(t, err)
var ecChunkAddress oid.Address
ecChunkAddress.SetContainer(cnr)
ecChunkAddress.SetObject(ecChunk)
var ecParentAddress oid.Address
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecParent)
var getPrm GetPrm
getPrm.SetAddress(ecChunkAddress)
_, err = db.Get(context.Background(), getPrm)
require.NoError(t, err)
var ecInfoError *objectSDK.ECInfoError
getPrm.SetAddress(ecParentAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, &ecInfoError)
require.True(t, len(ecInfoError.ECInfo().Chunks) == 1 &&
ecInfoError.ECInfo().Chunks[0].Index == 0 &&
ecInfoError.ECInfo().Chunks[0].Total == 3)
// inhume EC parent (like Delete does)
var inhumePrm InhumePrm
var tombAddress oid.Address
tombAddress.SetContainer(cnr)
tombAddress.SetObject(tombstoneID)
inhumePrm.SetAddresses(ecParentAddress)
inhumePrm.SetTombstoneAddress(tombAddress)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
getPrm.SetAddress(ecParentAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
getPrm.SetAddress(ecChunkAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
// GC finds and deletes split, EC parent and EC chunk
var garbageAddresses []oid.Address
var itPrm GarbageIterationPrm
itPrm.SetHandler(func(g GarbageObject) error {
garbageAddresses = append(garbageAddresses, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 2, len(garbageAddresses))
require.True(t, slices.Contains(garbageAddresses, ecParentAddress))
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
var deletePrm DeletePrm
deletePrm.SetAddresses(garbageAddresses...)
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
garbageAddresses = nil
itPrm.SetHandler(func(g GarbageObject) error {
garbageAddresses = append(garbageAddresses, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 0, len(garbageAddresses))
// after tombstone expired GC inhumes tombstone and drops graves
var tombstonedObjects []TombstonedObject
var graveyardIterationPrm GraveyardIterationPrm
graveyardIterationPrm.SetHandler(func(object TombstonedObject) error {
tombstonedObjects = append(tombstonedObjects, object)
return nil
})
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.Equal(t, 2, len(tombstonedObjects))
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil
itPrm.SetHandler(func(g GarbageObject) error {
garbageAddresses = append(garbageAddresses, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 1, len(garbageAddresses))
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
deletePrm.SetAddresses(garbageAddresses...)
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
// no more objects should left as garbage
itPrm.SetHandler(func(g GarbageObject) error {
require.FailNow(t, "no garbage objects should left")
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
require.NoError(t, testCountersAreZero(db, cnr))
}
func TestDeleteECObject_WithSplit(t *testing.T) {
t.Parallel()
for _, c := range []int{1, 2, 3} {
for _, l := range []bool{true, false} {
test := fmt.Sprintf("%d EC chunks with split info without linking object", c)
if l {
test = fmt.Sprintf("%d EC chunks with split info with linking object", c)
}
t.Run(test, func(t *testing.T) {
testDeleteECObjectWithSplit(t, c, l)
})
}
}
}
func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool) {
t.Parallel()
db := New(
WithPath(filepath.Join(t.TempDir(), "metabase")),
WithPermissions(0o600),
WithEpochState(epochState{uint64(12)}),
)
require.NoError(t, db.Open(context.Background(), mode.ReadWrite))
require.NoError(t, db.Init())
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
ecChunks := make([]oid.ID, chunksCount)
for idx := range ecChunks {
ecChunks[idx] = oidtest.ID()
}
ecParentID := oidtest.ID()
splitParentID := oidtest.ID()
tombstoneID := oidtest.ID()
splitID := objectSDK.NewSplitID()
linkingID := oidtest.ID()
ecChunkObjects := make([]*objectSDK.Object, chunksCount)
for idx := range ecChunkObjects {
ecChunkObjects[idx] = testutil.GenerateObjectWithCID(cnr)
ecChunkObjects[idx].SetContainerID(cnr)
ecChunkObjects[idx].SetID(ecChunks[idx])
ecChunkObjects[idx].SetPayload([]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
ecChunkObjects[idx].SetPayloadSize(uint64(10))
ecChunkObjects[idx].SetECHeader(objectSDK.NewECHeader(
objectSDK.ECParentInfo{
ID: ecParentID,
SplitParentID: &splitParentID, SplitID: splitID,
}, uint32(idx), uint32(chunksCount+1), []byte{}, 0))
}
splitParentObj := testutil.GenerateObjectWithCID(cnr)
splitParentObj.SetID(splitParentID)
var linkingAddress oid.Address
linkingAddress.SetContainer(cnr)
linkingAddress.SetObject(linkingID)
linkingObj := testutil.GenerateObjectWithCID(cnr)
linkingObj.SetID(linkingID)
linkingObj.SetParent(splitParentObj)
linkingObj.SetParentID(splitParentID)
linkingObj.SetChildren(ecParentID, oidtest.ID(), oidtest.ID())
linkingObj.SetSplitID(splitID)
// put object with EC and split info
var prm PutPrm
prm.SetStorageID([]byte("0/0"))
for _, obj := range ecChunkObjects {
prm.SetObject(obj)
_, err := db.Put(context.Background(), prm)
require.NoError(t, err)
}
if withLinking {
prm.SetObject(linkingObj)
_, err := db.Put(context.Background(), prm)
require.NoError(t, err)
}
var ecParentAddress oid.Address
ecParentAddress.SetContainer(cnr)
ecParentAddress.SetObject(ecParentID)
var getPrm GetPrm
var ecInfoError *objectSDK.ECInfoError
getPrm.SetAddress(ecParentAddress)
_, err := db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, &ecInfoError)
require.True(t, len(ecInfoError.ECInfo().Chunks) == chunksCount)
var splitParentAddress oid.Address
splitParentAddress.SetContainer(cnr)
splitParentAddress.SetObject(splitParentID)
var splitInfoError *objectSDK.SplitInfoError
getPrm.SetAddress(splitParentAddress)
getPrm.SetRaw(true)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, &splitInfoError)
require.True(t, splitInfoError.SplitInfo() != nil)
require.Equal(t, splitID, splitInfoError.SplitInfo().SplitID())
lastPart, set := splitInfoError.SplitInfo().LastPart()
require.True(t, set)
require.Equal(t, lastPart, ecParentID)
if withLinking {
l, ok := splitInfoError.SplitInfo().Link()
require.True(t, ok)
require.Equal(t, linkingID, l)
}
getPrm.SetRaw(false)
// inhume EC parent and split objects (like Delete does)
inhumeAddresses := []oid.Address{splitParentAddress, ecParentAddress}
if withLinking {
inhumeAddresses = append(inhumeAddresses, linkingAddress)
}
var inhumePrm InhumePrm
var tombAddress oid.Address
tombAddress.SetContainer(cnr)
tombAddress.SetObject(tombstoneID)
inhumePrm.SetAddresses(inhumeAddresses...)
inhumePrm.SetTombstoneAddress(tombAddress)
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
getPrm.SetAddress(ecParentAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
getPrm.SetAddress(splitParentAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
if withLinking {
getPrm.SetAddress(linkingAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
}
for _, id := range ecChunks {
var ecChunkAddress oid.Address
ecChunkAddress.SetContainer(cnr)
ecChunkAddress.SetObject(id)
getPrm.SetAddress(ecChunkAddress)
_, err = db.Get(context.Background(), getPrm)
require.ErrorAs(t, err, new(*apistatus.ObjectAlreadyRemoved))
}
// GC finds and deletes split, EC parent and EC chunks
parentCount := 2 // split + ec
if withLinking {
parentCount = 3
}
var garbageAddresses []oid.Address
var itPrm GarbageIterationPrm
itPrm.SetHandler(func(g GarbageObject) error {
garbageAddresses = append(garbageAddresses, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, parentCount+chunksCount, len(garbageAddresses))
require.True(t, slices.Contains(garbageAddresses, splitParentAddress))
require.True(t, slices.Contains(garbageAddresses, ecParentAddress))
if withLinking {
require.True(t, slices.Contains(garbageAddresses, linkingAddress))
}
for _, id := range ecChunks {
var ecChunkAddress oid.Address
ecChunkAddress.SetContainer(cnr)
ecChunkAddress.SetObject(id)
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
}
var deletePrm DeletePrm
deletePrm.SetAddresses(garbageAddresses...)
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
var garbageStub []oid.Address
itPrm.SetHandler(func(g GarbageObject) error {
garbageStub = append(garbageStub, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 0, len(garbageStub))
// after tombstone expired GC inhumes tombstone and drops graves
var tombstonedObjects []TombstonedObject
var graveyardIterationPrm GraveyardIterationPrm
graveyardIterationPrm.SetHandler(func(object TombstonedObject) error {
tombstonedObjects = append(tombstonedObjects, object)
return nil
})
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil
itPrm.SetHandler(func(g GarbageObject) error {
garbageAddresses = append(garbageAddresses, g.Address())
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.Equal(t, 1, len(garbageAddresses))
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
deletePrm.SetAddresses(garbageAddresses...)
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
// no more objects should left as garbage
itPrm.SetHandler(func(g GarbageObject) error {
require.FailNow(t, "no garbage objects should left")
return nil
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
require.NoError(t, testCountersAreZero(db, cnr))
}
func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
if bytes.Equal(name, shardInfoBucket) ||
bytes.Equal(name, containerCounterBucketName) ||
bytes.Equal(name, containerVolumeBucketName) {
return nil
}
return testBucketEmpty(name, b)
})
}
func testBucketEmpty(name []byte, b *bbolt.Bucket) error {
err := b.ForEach(func(k, v []byte) error {
if len(v) > 0 {
return fmt.Errorf("bucket %v is not empty", name)
}
return nil
})
if err != nil {
return err
}
return b.ForEachBucket(func(k []byte) error {
return testBucketEmpty(k, b.Bucket(k))
})
}
func testCountersAreZero(db *DB, cnr cid.ID) error {
c, err := db.ContainerCount(context.Background(), cnr)
if err != nil {
return err
}
if !c.IsZero() {
return fmt.Errorf("container %s has non zero counters", cnr.EncodeToString())
}
s, err := db.ContainerSize(cnr)
if err != nil {
return err
}
if s != 0 {
return fmt.Errorf("container %s has non zero size", cnr.EncodeToString())
}
return nil
}