diff --git a/pkg/local_object_storage/metabase/delete_ec_test.go b/pkg/local_object_storage/metabase/delete_ec_test.go index 0e627f095..99bf4aafc 100644 --- a/pkg/local_object_storage/metabase/delete_ec_test.go +++ b/pkg/local_object_storage/metabase/delete_ec_test.go @@ -16,6 +16,7 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" ) @@ -107,10 +108,11 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) { require.True(t, slices.Contains(garbageAddresses, ecParentAddress)) require.True(t, slices.Contains(garbageAddresses, ecChunkAddress)) - var deletePrm DeletePrm - deletePrm.SetAddresses(garbageAddresses...) - _, err = db.Delete(context.Background(), deletePrm) - require.NoError(t, err) + for _, a := range garbageAddresses { + deletePrm := DeletePrm{Address: a} + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + } garbageAddresses = nil itPrm.SetHandler(func(g GarbageObject) error { @@ -153,9 +155,11 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) { require.Equal(t, 1, len(garbageAddresses)) require.Equal(t, tombstoneID, garbageAddresses[0].Object()) - deletePrm.SetAddresses(garbageAddresses...) - _, err = db.Delete(context.Background(), deletePrm) - require.NoError(t, err) + for _, a := range garbageAddresses { + deletePrm := DeletePrm{Address: a} + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + } // no more objects should left as garbage @@ -165,7 +169,7 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) { }) require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) - require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft)) + require.NoError(t, testVerifyNoObjectDataLeft(db.database.NewSnapshot())) require.NoError(t, testCountersAreZero(db, cnr)) } @@ -351,10 +355,11 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool require.True(t, slices.Contains(garbageAddresses, ecChunkAddress)) } - var deletePrm DeletePrm - deletePrm.SetAddresses(garbageAddresses...) - _, err = db.Delete(context.Background(), deletePrm) - require.NoError(t, err) + for _, a := range garbageAddresses { + deletePrm := DeletePrm{Address: a} + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + } var garbageStub []oid.Address itPrm.SetHandler(func(g GarbageObject) error { @@ -397,9 +402,11 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool require.Equal(t, 1, len(garbageAddresses)) require.Equal(t, tombstoneID, garbageAddresses[0].Object()) - deletePrm.SetAddresses(garbageAddresses...) - _, err = db.Delete(context.Background(), deletePrm) - require.NoError(t, err) + for _, a := range garbageAddresses { + deletePrm := DeletePrm{Address: a} + _, err = db.Delete(context.Background(), deletePrm) + require.NoError(t, err) + } // no more objects should left as garbage @@ -409,20 +416,32 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool }) require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm)) - require.NoError(t, db.boltDB.View(testVerifyNoObjectDataLeft)) + require.NoError(t, testVerifyNoObjectDataLeft(db.database.NewSnapshot())) require.NoError(t, testCountersAreZero(db, cnr)) } -func testVerifyNoObjectDataLeft(tx *bbolt.Tx) error { - return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { - if bytes.Equal(name, shardInfoBucket) || - bytes.Equal(name, containerCounterBucketName) || - bytes.Equal(name, containerVolumeBucketName) { - return nil +func testVerifyNoObjectDataLeft(s *pebble.Snapshot) error { + defer s.Close() + it, err := s.NewIter(nil) + if err != nil { + return err + } + for v := it.First(); v; v = it.Next() { + if bytes.Equal(it.Key(), shardInfoKey(versionKey)) || + bytes.HasPrefix(it.Key(), []byte{containerCountersPrefix}) || + bytes.HasPrefix(it.Key(), []byte{containerSizePrefix}) { + continue } - return testBucketEmpty(name, b) - }) + + err := it.Close() + if err != nil { + return err + } + return fmt.Errorf("database is not empty, key %s found", string(it.Key())) + } + + return it.Close() } func testBucketEmpty(name []byte, b *bbolt.Bucket) error { @@ -448,7 +467,7 @@ func testCountersAreZero(db *DB, cnr cid.ID) error { if !c.IsZero() { return fmt.Errorf("container %s has non zero counters", cnr.EncodeToString()) } - s, err := db.ContainerSize(cnr) + s, err := db.ContainerSize(context.Background(), cnr) if err != nil { return err } diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 1c66758c9..fc344d726 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -227,7 +227,7 @@ func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm I return err } } else if errors.As(err, &ecErr) { - err = db.inhumeECInfo(ctx, b, epoch, keyer, value, res, ecErr.ECInfo(), cnr) + err = db.inhumeECInfo(ctx, b, epoch, prm.tomb, keyer, value, res, ecErr.ECInfo(), cnr) if err != nil { return err } @@ -272,7 +272,7 @@ func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm I return db.applyInhumeResToCounters(b, res) } -func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64, +func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64, tomb *oid.Address, keyer func(addr oid.Address) []byte, value []byte, res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID, ) error { @@ -293,6 +293,12 @@ func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64, if err != nil { return err } + if tomb != nil { + _, err = markAsGC(b, chunkAddr) + if err != nil { + return err + } + } key := keyer(chunkAddr) err = b.Set(key, value, pebble.Sync) if err != nil {