Remove DropGraves() #1493

Merged
fyrchik merged 4 commits from fyrchik/frostfs-node:metabase-put into master 2024-11-14 06:47:05 +00:00
5 changed files with 129 additions and 124 deletions

View file

@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.Equal(t, 2, len(tombstonedObjects))
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
return
}
// DropGraves deletes tombstoned objects from the
// InhumeTombstones deletes tombstoned objects from the
// graveyard bucket.
//
// Returns any error appeared during deletion process.
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success)
db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves")
_, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() {
return ErrReadOnlyMode
return InhumeRes{}, ErrReadOnlyMode
}
buf := make([]byte, addressKeySize)
prm := InhumePrm{forceRemoval: true}
currEpoch := db.epochState.CurrentEpoch()
return db.boltDB.Batch(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil {
return nil
var res InhumeRes
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
acid-ant marked this conversation as resolved Outdated

Do we need to allocate it here once we don't want to return it?

Do we need to allocate it here once we don't want to return it?

We need to initialize it here, because the function in Batch may be executed multiple times.

We need to initialize it here, because the function in `Batch` may be executed multiple times.

Oh, wait, it should be returned, the result is used in the caller.

Oh, wait, it should be returned, the result is used in the caller.

fixed

fixed

extended the test to check the return value

extended the test to check the return value
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
}
for _, ts := range tss {
err := bkt.Delete(addressKey(ts.Address(), buf))
if err != nil {
for i := range tss {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
return err
}
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
return err
}
}
return nil
})
return res, err
}

View file

@ -7,7 +7,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
require.False(t, iWasCalled)
}
func TestDB_DropGraves(t *testing.T) {
func TestDB_InhumeTombstones(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
err = putBig(db, obj2)
require.NoError(t, err)
// inhume with tombstone
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
id1, _ := obj1.ID()
id2, _ := obj2.ID()
ts := objectSDK.NewTombstone()
ts.SetMembers([]oid.ID{id1, id2})
objTs := objectSDK.New()
objTs.SetContainerID(cnr)
objTs.SetType(objectSDK.TypeTombstone)
data, _ := ts.Marshal()
objTs.SetPayload(data)
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
require.NoError(t, putBig(db, objTs))
addrTombstone := object.AddressOf(objTs)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, counter)
err = db.DropGraves(context.Background(), buriedTS)
res, err := db.InhumeTombstones(context.Background(), buriedTS)
require.NoError(t, err)
require.EqualValues(t, 1, res.LogicInhumed())
require.EqualValues(t, 0, res.UserInhumed())
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
counter = 0
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {

View file

@ -217,85 +217,93 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
}
buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
continue
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, addr, buf, false, true, epoch)
targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
return nil
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
return nil
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, addr)
}
}
return nil
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

View file

@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
// Mark tombstones as garbage.
var pInhume meta.InhumePrm
tsAddrs := make([]oid.Address, 0, len(tss))
for _, ts := range tss {
tsAddrs = append(tsAddrs, ts.Tombstone())
}
pInhume.SetGCMark()
pInhume.SetAddresses(tsAddrs...)
// inhume tombstones
res, err := s.metaBase.Inhume(ctx, pInhume)
res, err := s.metaBase.InhumeTombstones(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.String("error", err.Error()),
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
// drop just processed expired tombstones
// from graveyard
err = s.metaBase.DropGraves(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
}
}
// HandleExpiredLocks unlocks all objects which were locked by lockers.