Remove DropGraves() #1493
5 changed files with 129 additions and 124 deletions
|
@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
|||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||
require.Equal(t, 2, len(tombstonedObjects))
|
||||
|
||||
var tombstones []oid.Address
|
||||
for _, tss := range tombstonedObjects {
|
||||
tombstones = append(tombstones, tss.tomb)
|
||||
}
|
||||
inhumePrm.SetAddresses(tombstones...)
|
||||
inhumePrm.SetGCMark()
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||
|
||||
// GC finds tombstone as garbage and deletes it
|
||||
|
||||
garbageAddresses = nil
|
||||
|
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
|
|||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
||||
|
||||
var tombstones []oid.Address
|
||||
for _, tss := range tombstonedObjects {
|
||||
tombstones = append(tombstones, tss.tomb)
|
||||
}
|
||||
inhumePrm.SetAddresses(tombstones...)
|
||||
inhumePrm.SetGCMark()
|
||||
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||
|
||||
// GC finds tombstone as garbage and deletes it
|
||||
|
||||
garbageAddresses = nil
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// DropGraves deletes tombstoned objects from the
|
||||
// InhumeTombstones deletes tombstoned objects from the
|
||||
// graveyard bucket.
|
||||
//
|
||||
// Returns any error appeared during deletion process.
|
||||
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
||||
func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success)
|
||||
db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves")
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
return InhumeRes{}, ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
return InhumeRes{}, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
prm := InhumePrm{forceRemoval: true}
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
return db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(graveyardBucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
var res InhumeRes
|
||||
|
||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
||||
acid-ant marked this conversation as resolved
Outdated
|
||||
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, ts := range tss {
|
||||
err := bkt.Delete(addressKey(ts.Address(), buf))
|
||||
if err != nil {
|
||||
for i := range tss {
|
||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
return res, err
|
||||
}
|
||||
|
|
|
@ -7,7 +7,9 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
|
|||
require.False(t, iWasCalled)
|
||||
}
|
||||
|
||||
func TestDB_DropGraves(t *testing.T) {
|
||||
func TestDB_InhumeTombstones(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||
|
||||
|
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
|
|||
err = putBig(db, obj2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// inhume with tombstone
|
||||
addrTombstone := oidtest.Address()
|
||||
addrTombstone.SetContainer(cnr)
|
||||
id1, _ := obj1.ID()
|
||||
id2, _ := obj2.ID()
|
||||
ts := objectSDK.NewTombstone()
|
||||
ts.SetMembers([]oid.ID{id1, id2})
|
||||
objTs := objectSDK.New()
|
||||
objTs.SetContainerID(cnr)
|
||||
objTs.SetType(objectSDK.TypeTombstone)
|
||||
|
||||
data, _ := ts.Marshal()
|
||||
objTs.SetPayload(data)
|
||||
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
|
||||
require.NoError(t, putBig(db, objTs))
|
||||
|
||||
addrTombstone := object.AddressOf(objTs)
|
||||
|
||||
var inhumePrm meta.InhumePrm
|
||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||
|
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, 2, counter)
|
||||
|
||||
err = db.DropGraves(context.Background(), buriedTS)
|
||||
res, err := db.InhumeTombstones(context.Background(), buriedTS)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, res.LogicInhumed())
|
||||
require.EqualValues(t, 0, res.UserInhumed())
|
||||
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
|
||||
|
||||
counter = 0
|
||||
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
|
||||
|
|
|
@ -217,15 +217,25 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
for i := range prm.target {
|
||||
id := prm.target[i].Object()
|
||||
cnr := prm.target[i].Container()
|
||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return db.applyInhumeResToCounters(tx, res)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
id := addr.Object()
|
||||
cnr := addr.Container()
|
||||
tx := bkt.Tx()
|
||||
|
||||
// prevent locked objects to be inhumed
|
||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||
|
@ -245,8 +255,8 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
lockWasChecked = true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||
targetKey := addressKey(prm.target[i], buf)
|
||||
obj, err := db.get(tx, addr, buf, false, true, epoch)
|
||||
targetKey := addressKey(addr, buf)
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
|
@ -268,7 +278,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
}
|
||||
|
||||
if isTomb {
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -284,16 +294,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
if lockWasChecked {
|
||||
// inhumed object is not of
|
||||
// the LOCK type
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
|
||||
if isLockObject(tx, cnr, id) {
|
||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||
res.deletedLockObj = append(res.deletedLockObj, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return db.applyInhumeResToCounters(tx, res)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
|
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
|||
// 1. tombstone address if Inhume was called with
|
||||
// a Tombstone
|
||||
// 2. zeroValue if Inhume was called with a GC mark
|
||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||
if prm.tomb != nil {
|
||||
targetBucket = graveyardBKT
|
||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||
|
|
|
@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
|
|||
//
|
||||
// Does not modify tss.
|
||||
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
|
||||
if s.GetMode().NoMetabase() {
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return
|
||||
}
|
||||
|
||||
// Mark tombstones as garbage.
|
||||
var pInhume meta.InhumePrm
|
||||
|
||||
tsAddrs := make([]oid.Address, 0, len(tss))
|
||||
for _, ts := range tss {
|
||||
tsAddrs = append(tsAddrs, ts.Tombstone())
|
||||
}
|
||||
|
||||
pInhume.SetGCMark()
|
||||
pInhume.SetAddresses(tsAddrs...)
|
||||
|
||||
// inhume tombstones
|
||||
res, err := s.metaBase.Inhume(ctx, pInhume)
|
||||
res, err := s.metaBase.InhumeTombstones(ctx, tss)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
|||
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
||||
i++
|
||||
}
|
||||
|
||||
// drop just processed expired tombstones
|
||||
// from graveyard
|
||||
err = s.metaBase.DropGraves(ctx, tss)
|
||||
if err != nil {
|
||||
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||
|
|
Loading…
Add table
Reference in a new issue
Do we need to allocate it here once we don't want to return it?
We need to initialize it here, because the function in
Batch
may be executed multiple times.Oh, wait, it should be returned, the result is used in the caller.
fixed
extended the test to check the return value