[#9999] metabase: Fix EC delete tests

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-30 11:21:52 +03:00
parent 13234a6e4f
commit 3e00dc304c
2 changed files with 52 additions and 27 deletions

View file

@ -16,6 +16,7 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/cockroachdb/pebble"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
@ -107,10 +108,11 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.True(t, slices.Contains(garbageAddresses, ecParentAddress))
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
var deletePrm DeletePrm
deletePrm.SetAddresses(garbageAddresses...)
for _, a := range garbageAddresses {
deletePrm := DeletePrm{Address: a}
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
}
garbageAddresses = nil
itPrm.SetHandler(func(g GarbageObject) error {
@ -153,9 +155,11 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.Equal(t, 1, len(garbageAddresses))
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
deletePrm.SetAddresses(garbageAddresses...)
for _, a := range garbageAddresses {
deletePrm := DeletePrm{Address: a}
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
}
// no more objects should left as garbage
@ -165,7 +169,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
require.NoError(t, testVerifyNoObjectDataLeft(db.database.NewSnapshot()))
require.NoError(t, testCountersAreZero(db, cnr))
}
@ -351,10 +355,11 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.True(t, slices.Contains(garbageAddresses, ecChunkAddress))
}
var deletePrm DeletePrm
deletePrm.SetAddresses(garbageAddresses...)
for _, a := range garbageAddresses {
deletePrm := DeletePrm{Address: a}
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
}
var garbageStub []oid.Address
itPrm.SetHandler(func(g GarbageObject) error {
@ -397,9 +402,11 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.Equal(t, 1, len(garbageAddresses))
require.Equal(t, tombstoneID, garbageAddresses[0].Object())
deletePrm.SetAddresses(garbageAddresses...)
for _, a := range garbageAddresses {
deletePrm := DeletePrm{Address: a}
_, err = db.Delete(context.Background(), deletePrm)
require.NoError(t, err)
}
// no more objects should left as garbage
@ -409,20 +416,32 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
})
require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft))
require.NoError(t, testVerifyNoObjectDataLeft(db.database.NewSnapshot()))
require.NoError(t, testCountersAreZero(db, cnr))
}
func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
if bytes.Equal(name, shardInfoBucket) ||
bytes.Equal(name, containerCounterBucketName) ||
bytes.Equal(name, containerVolumeBucketName) {
return nil
func testVerifyNoObjectDataLeft(s *pebble.Snapshot) error {
defer s.Close()
it, err := s.NewIter(nil)
if err != nil {
return err
}
return testBucketEmpty(name, b)
})
for v := it.First(); v; v = it.Next() {
if bytes.Equal(it.Key(), shardInfoKey(versionKey)) ||
bytes.HasPrefix(it.Key(), []byte{containerCountersPrefix}) ||
bytes.HasPrefix(it.Key(), []byte{containerSizePrefix}) {
continue
}
err := it.Close()
if err != nil {
return err
}
return fmt.Errorf("database is not empty, key %s found", string(it.Key()))
}
return it.Close()
}
func testBucketEmpty(name []byte, b *bbolt.Bucket) error {
@ -448,7 +467,7 @@ func testCountersAreZero(db *DB, cnr cid.ID) error {
if !c.IsZero() {
return fmt.Errorf("container %s has non zero counters", cnr.EncodeToString())
}
s, err := db.ContainerSize(cnr)
s, err := db.ContainerSize(context.Background(), cnr)
if err != nil {
return err
}

View file

@ -227,7 +227,7 @@ func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm I
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(ctx, b, epoch, keyer, value, res, ecErr.ECInfo(), cnr)
err = db.inhumeECInfo(ctx, b, epoch, prm.tomb, keyer, value, res, ecErr.ECInfo(), cnr)
if err != nil {
return err
}
@ -272,7 +272,7 @@ func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm I
return db.applyInhumeResToCounters(b, res)
}
func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64,
func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64, tomb *oid.Address,
keyer func(addr oid.Address) []byte, value []byte,
res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID,
) error {
@ -293,6 +293,12 @@ func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64,
if err != nil {
return err
}
if tomb != nil {
_, err = markAsGC(b, chunkAddr)
if err != nil {
return err
}
}
key := keyer(chunkAddr)
err = b.Set(key, value, pebble.Sync)
if err != nil {