Remove DropGraves() #1493

Merged
fyrchik merged 4 commits from fyrchik/frostfs-node:metabase-put into master 2024-11-14 06:47:05 +00:00
5 changed files with 129 additions and 124 deletions

View file

@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.Equal(t, 2, len(tombstonedObjects)) require.Equal(t, 2, len(tombstonedObjects))
var tombstones []oid.Address _, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it // GC finds tombstone as garbage and deletes it
garbageAddresses = nil garbageAddresses = nil
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.True(t, len(tombstonedObjects) == parentCount+chunksCount) require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
var tombstones []oid.Address _, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it // GC finds tombstone as garbage and deletes it
garbageAddresses = nil garbageAddresses = nil

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
return return
} }
// DropGraves deletes tombstoned objects from the // InhumeTombstones deletes tombstoned objects from the
// graveyard bucket. // graveyard bucket.
// //
// Returns any error appeared during deletion process. // Returns any error appeared during deletion process.
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error { func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
) )
defer func() { defer func() {
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success) db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
}() }()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves") _, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
defer span.End() defer span.End()
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() { if db.mode.NoMetabase() {
return ErrDegradedMode return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() { } else if db.mode.ReadOnly() {
return ErrReadOnlyMode return InhumeRes{}, ErrReadOnlyMode
} }
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
prm := InhumePrm{forceRemoval: true}
currEpoch := db.epochState.CurrentEpoch()
return db.boltDB.Batch(func(tx *bbolt.Tx) error { var res InhumeRes
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil { err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
return nil res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
acid-ant marked this conversation as resolved Outdated

Do we need to allocate it here once we don't want to return it?

Do we need to allocate it here once we don't want to return it?

We need to initialize it here, because the function in Batch may be executed multiple times.

We need to initialize it here, because the function in `Batch` may be executed multiple times.

Oh, wait, it should be returned, the result is used in the caller.

Oh, wait, it should be returned, the result is used in the caller.

fixed

fixed

extended the test to check the return value

extended the test to check the return value
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
} }
for _, ts := range tss { for i := range tss {
err := bkt.Delete(addressKey(ts.Address(), buf)) if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
if err != nil { return err
}
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
return err return err
} }
} }
return nil return nil
}) })
return res, err
} }

View file

@ -7,7 +7,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
require.False(t, iWasCalled) require.False(t, iWasCalled)
} }
func TestDB_DropGraves(t *testing.T) { func TestDB_InhumeTombstones(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }() defer func() { require.NoError(t, db.Close(context.Background())) }()
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
err = putBig(db, obj2) err = putBig(db, obj2)
require.NoError(t, err) require.NoError(t, err)
// inhume with tombstone id1, _ := obj1.ID()
addrTombstone := oidtest.Address() id2, _ := obj2.ID()
addrTombstone.SetContainer(cnr) ts := objectSDK.NewTombstone()
ts.SetMembers([]oid.ID{id1, id2})
objTs := objectSDK.New()
objTs.SetContainerID(cnr)
objTs.SetType(objectSDK.TypeTombstone)
data, _ := ts.Marshal()
objTs.SetPayload(data)
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
require.NoError(t, putBig(db, objTs))
addrTombstone := object.AddressOf(objTs)
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, counter) require.Equal(t, 2, counter)
err = db.DropGraves(context.Background(), buriedTS) res, err := db.InhumeTombstones(context.Background(), buriedTS)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 1, res.LogicInhumed())
require.EqualValues(t, 0, res.UserInhumed())
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
counter = 0 counter = 0
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error { iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {

View file

@ -217,15 +217,25 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName) graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm) bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil { if err != nil {
return err return err
} }
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
for i := range prm.target { for i := range prm.target {
id := prm.target[i].Object() if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
cnr := prm.target[i].Container() return err
}
}
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed // prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) { if !prm.forceRemoval && objectLocked(tx, cnr, id) {
@ -245,8 +255,8 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
lockWasChecked = true lockWasChecked = true
} }
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) obj, err := db.get(tx, addr, buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf) targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError var ecErr *objectSDK.ECInfoError
if err == nil { if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
@ -268,7 +278,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
} }
if isTomb { if isTomb {
continue return nil
} }
} }
@ -284,16 +294,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
if lockWasChecked { if lockWasChecked {
// inhumed object is not of // inhumed object is not of
// the LOCK type // the LOCK type
continue return nil
} }
if isLockObject(tx, cnr, id) { if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) res.deletedLockObj = append(res.deletedLockObj, addr)
} }
} }
} return nil
return db.applyInhumeResToCounters(tx, res)
} }
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
// 1. tombstone address if Inhume was called with // 1. tombstone address if Inhume was called with
// a Tombstone // a Tombstone
// 2. zeroValue if Inhume was called with a GC mark // 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil { if prm.tomb != nil {
targetBucket = graveyardBKT targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

View file

@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
// //
// Does not modify tss. // Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) { func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() { s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return return
} }
// Mark tombstones as garbage. res, err := s.metaBase.InhumeTombstones(ctx, tss)
var pInhume meta.InhumePrm
tsAddrs := make([]oid.Address, 0, len(tss))
for _, ts := range tss {
tsAddrs = append(tsAddrs, ts.Tombstone())
}
pInhume.SetGCMark()
pInhume.SetAddresses(tsAddrs...)
// inhume tombstones
res, err := s.metaBase.Inhume(ctx, pInhume)
if err != nil { if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++ i++
} }
// drop just processed expired tombstones
// from graveyard
err = s.metaBase.DropGraves(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
}
} }
// HandleExpiredLocks unlocks all objects which were locked by lockers. // HandleExpiredLocks unlocks all objects which were locked by lockers.