forked from TrueCloudLab/frostfs-node
[#9999] metabase: Fix db engine to pebble in inhume.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
6bb7330a46
commit
691af2b2eb
7 changed files with 117 additions and 108 deletions
|
@ -53,9 +53,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) {
|
|||
unique := make(map[string]struct{})
|
||||
var cnr cid.ID
|
||||
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
it, err := r.NewIter(&pebble.IterOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -188,8 +186,7 @@ func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, er
|
|||
result := make(map[cid.ID]int64)
|
||||
err := db.snapshot(func(s *pebble.Snapshot) error {
|
||||
it, err := s.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -400,8 +400,7 @@ func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) boo
|
|||
func containerObjectCounters(ctx context.Context, r pebble.Reader) (map[cid.ID]ObjectCounters, error) {
|
||||
prefix := []byte{containerCountersPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -212,7 +212,6 @@ func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f
|
|||
SkipPoint: func(k []byte) bool {
|
||||
return ok && bytes.Equal(val, k)
|
||||
},
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -243,9 +242,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string,
|
|||
}
|
||||
|
||||
if len(val) == 0 {
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
})
|
||||
it, err := r.NewIter(&pebble.IterOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -259,8 +256,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string,
|
|||
}
|
||||
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -57,8 +57,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A
|
|||
func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) {
|
||||
prefix := []byte{expiredPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -102,8 +101,7 @@ func selectExpiredObjects(ctx context.Context, r pebble.Reader, epoch uint64, ob
|
|||
|
||||
prefix := []byte{expiredPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -175,8 +173,7 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH
|
|||
func iterateExpired(ctx context.Context, r pebble.Reader, epoch uint64, h ExpiredObjectHandler) error {
|
||||
prefix := []byte{expiredPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"github.com/cockroachdb/pebble"
|
||||
)
|
||||
|
||||
// InhumePrm encapsulates parameters for Inhume operation.
|
||||
|
@ -180,9 +180,19 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
res := InhumeRes{
|
||||
inhumedByCnrID: make(map[cid.ID]ObjectCounters),
|
||||
}
|
||||
|
||||
var containerIDs []cid.ID
|
||||
if prm.tomb != nil {
|
||||
containerIDs = append(containerIDs, prm.tomb.Container())
|
||||
}
|
||||
for _, a := range prm.target {
|
||||
containerIDs = append(containerIDs, a.Container())
|
||||
}
|
||||
defer db.guard.LockContainerIDs(containerIDs)()
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
err := db.database.Update(func(tx *bbolt.Tx) error {
|
||||
return db.inhumeTx(tx, currEpoch, prm, &res)
|
||||
err := db.batch(func(b *pebble.Batch) error {
|
||||
return db.inhumeTx(ctx, b, currEpoch, prm, &res)
|
||||
})
|
||||
success = err == nil
|
||||
if success {
|
||||
|
@ -195,48 +205,31 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) {
|
|||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||
|
||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
|
||||
func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
||||
keyer, value, err := getInhumeTargetBucketAndValue(b, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf := make([]byte, addressKeySize)
|
||||
for i := range prm.target {
|
||||
id := prm.target[i].Object()
|
||||
cnr := prm.target[i].Container()
|
||||
|
||||
// prevent locked objects to be inhumed
|
||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||
return new(apistatus.ObjectLocked)
|
||||
}
|
||||
|
||||
var lockWasChecked bool
|
||||
|
||||
// prevent lock objects to be inhumed
|
||||
// if `Inhume` was called not with the
|
||||
// `WithForceGCMark` option
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
if !prm.forceRemoval {
|
||||
if isLockObject(tx, cnr, id) {
|
||||
return ErrLockObjectRemoval
|
||||
if err := checkNotLockerOrLocked(ctx, b, cnr, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lockWasChecked = true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||
targetKey := addressKey(prm.target[i], buf)
|
||||
var ecErr *objectSDK.ECInfoError
|
||||
obj, err := get(ctx, b, prm.target[i], false, true, epoch)
|
||||
if err == nil {
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||
err = db.updateDeleteInfo(b, prm.target[i], obj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if errors.As(err, &ecErr) {
|
||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey)
|
||||
err = db.inhumeECInfo(ctx, b, epoch, keyer, value, res, ecErr.ECInfo(), cnr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -244,18 +237,18 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
|
||||
if prm.tomb != nil {
|
||||
var isTomb bool
|
||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
||||
isTomb, err = markAsGC(b, prm.target[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isTomb {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// consider checking if target is already in graveyard?
|
||||
err = bkt.Put(targetKey, value)
|
||||
key := keyer(prm.target[i])
|
||||
err = b.Set(key, value, pebble.Sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -268,22 +261,24 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
|||
// the LOCK type
|
||||
continue
|
||||
}
|
||||
|
||||
if isLockObject(tx, cnr, id) {
|
||||
isLock, err := isLockObject(b, cnr, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isLock {
|
||||
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return db.applyInhumeResToCounters(tx, res)
|
||||
return db.applyInhumeResToCounters(b, res)
|
||||
}
|
||||
|
||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte,
|
||||
func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64,
|
||||
keyer func(addr oid.Address) []byte, value []byte,
|
||||
res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID,
|
||||
) error {
|
||||
for _, chunk := range ecInfo.Chunks {
|
||||
chunkBuf := make([]byte, addressKeySize)
|
||||
var chunkAddr oid.Address
|
||||
chunkAddr.SetContainer(cnr)
|
||||
var chunkID oid.ID
|
||||
|
@ -292,22 +287,16 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
|||
return err
|
||||
}
|
||||
chunkAddr.SetObject(chunkID)
|
||||
chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch)
|
||||
chunkObj, err := get(ctx, b, chunkAddr, false, true, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res)
|
||||
err = db.updateDeleteInfo(b, chunkAddr, chunkObj, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkKey := addressKey(chunkAddr, chunkBuf)
|
||||
if tomb != nil {
|
||||
_, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = targetBucket.Put(chunkKey, value)
|
||||
key := keyer(chunkAddr)
|
||||
err = b.Set(key, value, pebble.Sync)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -315,15 +304,38 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||
if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil {
|
||||
func checkNotLockerOrLocked(ctx context.Context, r pebble.Reader, cnr cid.ID, id oid.ID) error {
|
||||
// prevent locked objects to be inhumed
|
||||
locked, err := objectLocked(ctx, r, cnr, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil {
|
||||
if locked {
|
||||
return new(apistatus.ObjectLocked)
|
||||
}
|
||||
// prevent lock objects to be inhumed
|
||||
// if `Inhume` was called not with the
|
||||
// `WithForceGCMark` option
|
||||
isLock, err := isLockObject(r, cnr, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isLock {
|
||||
return ErrLockObjectRemoval
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.updateContainerCounter(tx, res.inhumedByCnrID, false)
|
||||
func (db *DB) applyInhumeResToCounters(b *pebble.Batch, res *InhumeRes) error {
|
||||
counters := make(map[cid.ID]objectCounterValue, len(res.inhumedByCnrID))
|
||||
for contID, inhumed := range res.inhumedByCnrID {
|
||||
counters[contID] = objectCounterValue{
|
||||
Logic: -1 * int64(inhumed.Logic),
|
||||
Phy: -1 * int64(inhumed.Phy),
|
||||
User: -1 * int64(inhumed.User),
|
||||
}
|
||||
}
|
||||
return updateContainerCounter(b, counters)
|
||||
}
|
||||
|
||||
// getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket.
|
||||
|
@ -336,35 +348,36 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
|||
// 1. tombstone address if Inhume was called with
|
||||
// a Tombstone
|
||||
// 2. zeroValue if Inhume was called with a GC mark
|
||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||
func getInhumeTargetBucketAndValue(b *pebble.Batch, prm InhumePrm) (key func(addr oid.Address) []byte, value []byte, err error) {
|
||||
if prm.tomb != nil {
|
||||
targetBucket = graveyardBKT
|
||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||
|
||||
// it is forbidden to have a tomb-on-tomb in FrostFS,
|
||||
// so graveyard keys must not be addresses of tombstones
|
||||
data := targetBucket.Get(tombKey)
|
||||
if data != nil {
|
||||
err := targetBucket.Delete(tombKey)
|
||||
tombKey := graveyardKey(prm.tomb.Container(), prm.tomb.Object())
|
||||
v, err := valueSafe(b, tombKey)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if v != nil {
|
||||
err := b.Delete(tombKey, pebble.Sync)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
value = tombKey
|
||||
} else {
|
||||
targetBucket = garbageBKT
|
||||
value = zeroValue
|
||||
return func(addr oid.Address) []byte {
|
||||
return graveyardKey(addr.Container(), addr.Object())
|
||||
}, encodeAddressToGrave(*prm.tomb), nil
|
||||
}
|
||||
return targetBucket, value, nil
|
||||
return func(addr oid.Address) []byte {
|
||||
return garbageKey(addr.Container(), addr.Object())
|
||||
}, zeroValue, nil
|
||||
}
|
||||
|
||||
func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) {
|
||||
targetIsTomb, err := isTomb(graveyardBKT, key)
|
||||
func markAsGC(b *pebble.Batch, addr oid.Address) (bool, error) {
|
||||
targetIsTomb, err := isTomb(b, addr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// do not add grave if target is a tombstone
|
||||
if targetIsTomb {
|
||||
return true, nil
|
||||
|
@ -372,19 +385,23 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool
|
|||
|
||||
// if tombstone appears object must be
|
||||
// additionally marked with GC
|
||||
return false, garbageBKT.Put(key, zeroValue)
|
||||
key := garbageKey(addr.Container(), addr.Object())
|
||||
return false, b.Set(key, zeroValue, pebble.Sync)
|
||||
}
|
||||
|
||||
func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
containerID, _ := obj.ContainerID()
|
||||
if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 {
|
||||
res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj))
|
||||
func (db *DB) updateDeleteInfo(b *pebble.Batch, addr oid.Address, obj *objectSDK.Object, res *InhumeRes) error {
|
||||
st, err := inGraveyardWithKey(b, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if st == 0 {
|
||||
res.storeDeletionInfo(addr.Container(), obj.PayloadSize(), IsUserObject(obj))
|
||||
}
|
||||
|
||||
// if object is stored, and it is regular object then update bucket
|
||||
// with container size estimations
|
||||
if obj.Type() == objectSDK.TypeRegular {
|
||||
err := changeContainerSize(tx, cnr, obj.PayloadSize(), false)
|
||||
err := changeContainerSize(b, addr.Container(), -1*int64(obj.PayloadSize()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -392,25 +409,30 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc
|
|||
return nil
|
||||
}
|
||||
|
||||
func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) {
|
||||
func isTomb(r pebble.Reader, addr oid.Address) (bool, error) {
|
||||
targetIsTomb := false
|
||||
expectedValue := make([]byte, cidSize+objectKeySize)
|
||||
addr.Container().Encode(expectedValue)
|
||||
addr.Object().Encode(expectedValue[cidSize:])
|
||||
prefix := []byte{graveyardPrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// iterate over graveyard and check if target address
|
||||
// is the address of tombstone in graveyard.
|
||||
err := graveyardBucket.ForEach(func(_, v []byte) error {
|
||||
// check if graveyard has record with key corresponding
|
||||
// to tombstone address (at least one)
|
||||
targetIsTomb = bytes.Equal(v, key)
|
||||
// check if graveyard has record with key corresponding
|
||||
// to tombstone address (at least one)
|
||||
for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||
targetIsTomb = bytes.Equal(expectedValue, it.Value())
|
||||
|
||||
if targetIsTomb {
|
||||
// break bucket iterator
|
||||
return errBreakBucketForEach
|
||||
return true, it.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, errBreakBucketForEach) {
|
||||
return false, err
|
||||
}
|
||||
return targetIsTomb, nil
|
||||
return false, it.Close()
|
||||
}
|
||||
|
|
|
@ -141,8 +141,7 @@ func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object
|
|||
func iteratePhyObjectsWithPrefix(r pebble.Reader, typePrefix byte, f func(cid.ID, oid.ID, *objectSDK.Object) error) error {
|
||||
prefix := []byte{typePrefix}
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -45,8 +45,7 @@ func (db *DB) snapshot(f func(*pebble.Snapshot) error) error {
|
|||
|
||||
func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, batchSize int) ([][]byte, error) {
|
||||
it, err := r.NewIter(&pebble.IterOptions{
|
||||
LowerBound: prefix,
|
||||
OnlyReadGuaranteedDurable: true,
|
||||
LowerBound: prefix,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in a new issue