Remove DropGraves() #1493

Merged
fyrchik merged 4 commits from fyrchik/frostfs-node:metabase-put into master 2024-11-14 06:47:05 +00:00
5 changed files with 129 additions and 124 deletions

View file

@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.Equal(t, 2, len(tombstonedObjects)) require.Equal(t, 2, len(tombstonedObjects))
var tombstones []oid.Address _, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it // GC finds tombstone as garbage and deletes it
garbageAddresses = nil garbageAddresses = nil
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm)) require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.True(t, len(tombstonedObjects) == parentCount+chunksCount) require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
var tombstones []oid.Address _, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it // GC finds tombstone as garbage and deletes it
garbageAddresses = nil garbageAddresses = nil

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
return return
} }
// DropGraves deletes tombstoned objects from the // InhumeTombstones deletes tombstoned objects from the
// graveyard bucket. // graveyard bucket.
// //
// Returns any error appeared during deletion process. // Returns any error appeared during deletion process.
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error { func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
) )
defer func() { defer func() {
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success) db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
}() }()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves") _, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
defer span.End() defer span.End()
db.modeMtx.RLock() db.modeMtx.RLock()
defer db.modeMtx.RUnlock() defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() { if db.mode.NoMetabase() {
return ErrDegradedMode return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() { } else if db.mode.ReadOnly() {
return ErrReadOnlyMode return InhumeRes{}, ErrReadOnlyMode
} }
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
prm := InhumePrm{forceRemoval: true}
currEpoch := db.epochState.CurrentEpoch()
return db.boltDB.Batch(func(tx *bbolt.Tx) error { var res InhumeRes
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil { err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
return nil res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
acid-ant marked this conversation as resolved Outdated

Do we need to allocate it here once we don't want to return it?

Do we need to allocate it here once we don't want to return it?

We need to initialize it here, because the function in Batch may be executed multiple times.

We need to initialize it here, because the function in `Batch` may be executed multiple times.

Oh, wait, it should be returned, the result is used in the caller.

Oh, wait, it should be returned, the result is used in the caller.

fixed

fixed

extended the test to check the return value

extended the test to check the return value
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
} }
for _, ts := range tss { for i := range tss {
err := bkt.Delete(addressKey(ts.Address(), buf)) if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
if err != nil { return err
}
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
return err return err
} }
} }
return nil return nil
}) })
return res, err
} }

View file

@ -7,7 +7,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
require.False(t, iWasCalled) require.False(t, iWasCalled)
} }
func TestDB_DropGraves(t *testing.T) { func TestDB_InhumeTombstones(t *testing.T) {
db := newDB(t) db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }() defer func() { require.NoError(t, db.Close(context.Background())) }()
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
err = putBig(db, obj2) err = putBig(db, obj2)
require.NoError(t, err) require.NoError(t, err)
// inhume with tombstone id1, _ := obj1.ID()
addrTombstone := oidtest.Address() id2, _ := obj2.ID()
addrTombstone.SetContainer(cnr) ts := objectSDK.NewTombstone()
ts.SetMembers([]oid.ID{id1, id2})
objTs := objectSDK.New()
objTs.SetContainerID(cnr)
objTs.SetType(objectSDK.TypeTombstone)
data, _ := ts.Marshal()
objTs.SetPayload(data)
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
require.NoError(t, putBig(db, objTs))
addrTombstone := object.AddressOf(objTs)
var inhumePrm meta.InhumePrm var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2)) inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, counter) require.Equal(t, 2, counter)
err = db.DropGraves(context.Background(), buriedTS) res, err := db.InhumeTombstones(context.Background(), buriedTS)
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, 1, res.LogicInhumed())
require.EqualValues(t, 0, res.UserInhumed())
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
counter = 0 counter = 0
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error { iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {

View file

@ -217,85 +217,93 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
garbageBKT := tx.Bucket(garbageBucketName) garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName) graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm) bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil { if err != nil {
return err return err
} }
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
for i := range prm.target { for i := range prm.target {
id := prm.target[i].Object() if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
continue
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err return err
} }
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
} }
return db.applyInhumeResToCounters(tx, res) return db.applyInhumeResToCounters(tx, res)
} }
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
}
lockWasChecked = true
}
obj, err := db.get(tx, addr, buf, false, true, epoch)
targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
if err != nil {
return err
}
}
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
if err != nil {
return err
}
if isTomb {
return nil
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
if err != nil {
return err
}
if prm.lockObjectHandling {
// do not perform lock check if
// it was already called
if lockWasChecked {
// inhumed object is not of
// the LOCK type
return nil
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, addr)
}
}
return nil
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
// 1. tombstone address if Inhume was called with // 1. tombstone address if Inhume was called with
// a Tombstone // a Tombstone
// 2. zeroValue if Inhume was called with a GC mark // 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil { if prm.tomb != nil {
targetBucket = graveyardBKT targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

View file

@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
// //
// Does not modify tss. // Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) { func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() { s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return return
} }
// Mark tombstones as garbage. res, err := s.metaBase.InhumeTombstones(ctx, tss)
var pInhume meta.InhumePrm
tsAddrs := make([]oid.Address, 0, len(tss))
for _, ts := range tss {
tsAddrs = append(tsAddrs, ts.Tombstone())
}
pInhume.SetGCMark()
pInhume.SetAddresses(tsAddrs...)
// inhume tombstones
res, err := s.metaBase.Inhume(ctx, pInhume)
if err != nil { if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage, s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.String("error", err.Error()), zap.String("error", err.Error()),
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++ i++
} }
// drop just processed expired tombstones
// from graveyard
err = s.metaBase.DropGraves(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
}
} }
// HandleExpiredLocks unlocks all objects which were locked by lockers. // HandleExpiredLocks unlocks all objects which were locked by lockers.