[#9999] metabase: Fix db engine to pebble in inhume.go

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-02 11:03:04 +03:00
parent 168fc7832c
commit 268862c9f0
7 changed files with 117 additions and 108 deletions

View file

@ -53,9 +53,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
unique := make(map[string]struct{})
var cnr cid.ID
it, err := r.NewIter(&pebble.IterOptions{
OnlyReadGuaranteedDurable: true,
})
it, err := r.NewIter(&pebble.IterOptions{})
if err != nil {
return nil, err
}
@ -188,8 +186,7 @@ func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, er
result := make(map[cid.ID]int64)
err := db.snapshot(func(s *pebble.Snapshot) error {
it, err := s.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return err

View file

@ -400,8 +400,7 @@ func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) boo
func containerObjectCounters(ctx context.Context, r pebble.Reader) (map[cid.ID]ObjectCounters, error) {
prefix := []byte{containerCountersPrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return nil, err

View file

@ -212,7 +212,6 @@ func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f
SkipPoint: func(k []byte) bool {
return ok && bytes.Equal(val, k)
},
OnlyReadGuaranteedDurable: true,
})
if err != nil {
return err
@ -243,9 +242,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string,
}
if len(val) == 0 {
it, err := r.NewIter(&pebble.IterOptions{
OnlyReadGuaranteedDurable: true,
})
it, err := r.NewIter(&pebble.IterOptions{})
if err != nil {
return err
}
@ -259,8 +256,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string,
}
it, err := r.NewIter(&pebble.IterOptions{
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
LowerBound: prefix,
})
if err != nil {
return err

View file

@ -57,8 +57,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) {
prefix := []byte{expiredPrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return false, err
@ -102,8 +101,7 @@ func selectExpiredObjects(ctx context.Context, r pebble.Reader, epoch uint64, ob
prefix := []byte{expiredPrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return nil, err
@ -175,8 +173,7 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
func iterateExpired(ctx context.Context, r pebble.Reader, epoch uint64, h ExpiredObjectHandler) error {
prefix := []byte{expiredPrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return err

View file

@ -15,7 +15,7 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"github.com/cockroachdb/pebble"
)
// InhumePrm encapsulates parameters for Inhume operation.
@ -180,9 +180,19 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
res := InhumeRes{
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
}
var containerIDs []cid.ID
if prm.tomb != nil {
containerIDs = append(containerIDs, prm.tomb.Container())
}
for _, a := range prm.target {
containerIDs = append(containerIDs, a.Container())
}
defer db.guard.LockContainerIDs(containerIDs)()
currEpoch := db.epochState.CurrentEpoch()
err := db.database.Update(func(tx *bbolt.Tx) error {
return db.inhumeTx(tx, currEpoch, prm, &res)
err := db.batch(func(b *pebble.Batch) error {
return db.inhumeTx(ctx, b, currEpoch, prm, &res)
})
success = err == nil
if success {
@ -195,48 +205,31 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
return res, metaerr.Wrap(err)
}
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm InhumePrm, res *InhumeRes) error {
keyer, value, err := getInhumeTargetBucketAndValue(b, prm)
if err != nil {
return err
}
buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
return new(apistatus.ObjectLocked)
}
var lockWasChecked bool
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
var ecErr *objectSDK.ECInfoError
if !prm.forceRemoval {
if isLockObject(tx, cnr, id) {
return ErrLockObjectRemoval
if err := checkNotLockerOrLocked(ctx, b, cnr, id); err != nil {
return err
}
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
var ecErr *objectSDK.ECInfoError
obj, err := get(ctx, b, prm.target[i], false, true, epoch)
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
err = db.updateDeleteInfo(b, prm.target[i], obj, res)
if err != nil {
return err
}
} else if errors.As(err, &ecErr) {
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
err = db.inhumeECInfo(ctx, b, epoch, keyer, value, res, ecErr.ECInfo(), cnr)
if err != nil {
return err
}
@ -244,18 +237,18 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
if prm.tomb != nil {
var isTomb bool
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
isTomb, err = markAsGC(b, prm.target[i])
if err != nil {
return err
}
if isTomb {
continue
}
}
// consider checking if target is already in graveyard?
err = bkt.Put(targetKey, value)
key := keyer(prm.target[i])
err = b.Set(key, value, pebble.Sync)
if err != nil {
return err
}
@ -268,22 +261,24 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
// the LOCK type
continue
}
if isLockObject(tx, cnr, id) {
isLock, err := isLockObject(b, cnr, id)
if err != nil {
return err
}
if isLock {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
}
}
}
return db.applyInhumeResToCounters(tx, res)
return db.applyInhumeResToCounters(b, res)
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64,
keyer func(addr oid.Address) []byte, value []byte,
res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID,
) error {
for _, chunk := range ecInfo.Chunks {
chunkBuf := make([]byte, addressKeySize)
var chunkAddr oid.Address
chunkAddr.SetContainer(cnr)
var chunkID oid.ID
@ -292,22 +287,16 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
return err
}
chunkAddr.SetObject(chunkID)
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
chunkObj, err := get(ctx, b, chunkAddr, false, true, epoch)
if err != nil {
return err
}
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
err = db.updateDeleteInfo(b, chunkAddr, chunkObj, res)
if err != nil {
return err
}
chunkKey := addressKey(chunkAddr, chunkBuf)
if tomb != nil {
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
if err != nil {
return err
}
}
err = targetBucket.Put(chunkKey, value)
key := keyer(chunkAddr)
err = b.Set(key, value, pebble.Sync)
if err != nil {
return err
}
@ -315,15 +304,38 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
return nil
}
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
func checkNotLockerOrLocked(ctx context.Context, r pebble.Reader, cnr cid.ID, id oid.ID) error {
// prevent locked objects to be inhumed
locked, err := objectLocked(ctx, r, cnr, id)
if err != nil {
return err
}
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
if locked {
return new(apistatus.ObjectLocked)
}
// prevent lock objects to be inhumed
// if `Inhume` was called not with the
// `WithForceGCMark` option
isLock, err := isLockObject(r, cnr, id)
if err != nil {
return err
}
if isLock {
return ErrLockObjectRemoval
}
return nil
}
return db.updateContainerCounter(tx, res.inhumedByCnrID, false)
func (db *DB) applyInhumeResToCounters(b *pebble.Batch, res *InhumeRes) error {
counters := make(map[cid.ID]objectCounterValue, len(res.inhumedByCnrID))
for contID, inhumed := range res.inhumedByCnrID {
counters[contID] = objectCounterValue{
Logic: -1 * int64(inhumed.Logic),
Phy: -1 * int64(inhumed.Phy),
User: -1 * int64(inhumed.User),
}
}
return updateContainerCounter(b, counters)
}
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
@ -336,35 +348,36 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
func getInhumeTargetBucketAndValue(b *pebble.Batch, prm InhumePrm) (key func(addr oid.Address) []byte, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
// it is forbidden to have a tomb-on-tomb in FrostFS,
// so graveyard keys must not be addresses of tombstones
data := targetBucket.Get(tombKey)
if data != nil {
err := targetBucket.Delete(tombKey)
tombKey := graveyardKey(prm.tomb.Container(), prm.tomb.Object())
v, err := valueSafe(b, tombKey)
if err != nil {
return nil, nil, err
}
if v != nil {
err := b.Delete(tombKey, pebble.Sync)
if err != nil {
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
}
}
value = tombKey
} else {
targetBucket = garbageBKT
value = zeroValue
return func(addr oid.Address) []byte {
return graveyardKey(addr.Container(), addr.Object())
}, encodeAddressToGrave(*prm.tomb), nil
}
return targetBucket, value, nil
return func(addr oid.Address) []byte {
return garbageKey(addr.Container(), addr.Object())
}, zeroValue, nil
}
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) {
targetIsTomb, err := isTomb(graveyardBKT, key)
func markAsGC(b *pebble.Batch, addr oid.Address) (bool, error) {
targetIsTomb, err := isTomb(b, addr)
if err != nil {
return false, err
}
// do not add grave if target is a tombstone
if targetIsTomb {
return true, nil
@ -372,19 +385,23 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
// if tombstone appears object must be
// additionally marked with GC
return false, garbageBKT.Put(key, zeroValue)
key := garbageKey(addr.Container(), addr.Object())
return false, b.Set(key, zeroValue, pebble.Sync)
}
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
containerID, _ := obj.ContainerID()
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
func (db *DB) updateDeleteInfo(b *pebble.Batch, addr oid.Address, obj *objectSDK.Object, res *InhumeRes) error {
st, err := inGraveyardWithKey(b, addr)
if err != nil {
return err
}
if st == 0 {
res.storeDeletionInfo(addr.Container(), obj.PayloadSize(), IsUserObject(obj))
}
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == objectSDK.TypeRegular {
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
err := changeContainerSize(b, addr.Container(), -1*int64(obj.PayloadSize()))
if err != nil {
return err
}
@ -392,25 +409,30 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
return nil
}
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) {
func isTomb(r pebble.Reader, addr oid.Address) (bool, error) {
targetIsTomb := false
expectedValue := make([]byte, cidSize+objectKeySize)
addr.Container().Encode(expectedValue)
addr.Object().Encode(expectedValue[cidSize:])
prefix := []byte{graveyardPrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
})
if err != nil {
return false, err
}
// iterate over graveyard and check if target address
// is the address of tombstone in graveyard.
err := graveyardBucket.ForEach(func(_, v []byte) error {
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
targetIsTomb = bytes.Equal(v, key)
// check if graveyard has record with key corresponding
// to tombstone address (at least one)
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
targetIsTomb = bytes.Equal(expectedValue, it.Value())
if targetIsTomb {
// break bucket iterator
return errBreakBucketForEach
return true, it.Close()
}
return nil
})
if err != nil && !errors.Is(err, errBreakBucketForEach) {
return false, err
}
return targetIsTomb, nil
return false, it.Close()
}

View file

@ -141,8 +141,7 @@ func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object
func iteratePhyObjectsWithPrefix(r pebble.Reader, typePrefix byte, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
prefix := []byte{typePrefix}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return err

View file

@ -45,8 +45,7 @@ func (db *DB) snapshot(f func(*pebble.Snapshot) error) error {
func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, batchSize int) ([][]byte, error) {
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
OnlyReadGuaranteedDurable: true,
LowerBound: prefix,
})
if err != nil {
return nil, err