From 691af2b2eb386e7c5f04061a537615e4780a6225 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 11:03:04 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in inhume.go Signed-off-by: Dmitrii Stepanov --- .../metabase/containers.go | 7 +- pkg/local_object_storage/metabase/counter.go | 3 +- pkg/local_object_storage/metabase/db.go | 8 +- pkg/local_object_storage/metabase/expired.go | 9 +- pkg/local_object_storage/metabase/inhume.go | 192 ++++++++++-------- .../metabase/iterators.go | 3 +- pkg/local_object_storage/metabase/pebble.go | 3 +- 7 files changed, 117 insertions(+), 108 deletions(-) diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index 198978892..32b6bc91a 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -53,9 +53,7 @@ func containers(ctx context.Context, r pebble.Reader) ([]cid.ID, error) { unique := make(map[string]struct{}) var cnr cid.ID - it, err := r.NewIter(&pebble.IterOptions{ - OnlyReadGuaranteedDurable: true, - }) + it, err := r.NewIter(&pebble.IterOptions{}) if err != nil { return nil, err } @@ -188,8 +186,7 @@ func (db *DB) containerSizesInternal(ctx context.Context) (map[cid.ID]uint64, er result := make(map[cid.ID]int64) err := db.snapshot(func(s *pebble.Snapshot) error { it, err := s.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 9e3423c3e..d3efe8eb5 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -400,8 +400,7 @@ func containerObjectCounterInitialized(ctx context.Context, r pebble.Reader) boo func containerObjectCounters(ctx context.Context, r pebble.Reader) (map[cid.ID]ObjectCounters, error) { prefix := []byte{containerCountersPrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return nil, err diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 60a59540d..4ed2ba1e8 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -212,7 +212,6 @@ func stringNotEqualMatcherBucket(r pebble.Reader, fKey string, fValue string, f SkipPoint: func(k []byte) bool { return ok && bytes.Equal(val, k) }, - OnlyReadGuaranteedDurable: true, }) if err != nil { return err @@ -243,9 +242,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, } if len(val) == 0 { - it, err := r.NewIter(&pebble.IterOptions{ - OnlyReadGuaranteedDurable: true, - }) + it, err := r.NewIter(&pebble.IterOptions{}) if err != nil { return err } @@ -259,8 +256,7 @@ func stringCommonPrefixMatcherBucket(r pebble.Reader, fKey string, fVal string, } it, err := r.NewIter(&pebble.IterOptions{ - OnlyReadGuaranteedDurable: true, - LowerBound: prefix, + LowerBound: prefix, }) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/expired.go b/pkg/local_object_storage/metabase/expired.go index 07b2e84e1..8010ea957 100644 --- a/pkg/local_object_storage/metabase/expired.go +++ b/pkg/local_object_storage/metabase/expired.go @@ -57,8 +57,7 @@ func (db *DB) FilterExpired(ctx context.Context, epoch uint64, addresses []oid.A func isExpired(ctx context.Context, r pebble.Reader, addr oid.Address, currEpoch uint64) (bool, error) { prefix := []byte{expiredPrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return false, err @@ -102,8 +101,7 @@ func selectExpiredObjects(ctx context.Context, r pebble.Reader, epoch uint64, ob prefix := []byte{expiredPrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return nil, err @@ -175,8 +173,7 @@ func (db *DB) IterateExpired(ctx context.Context, epoch uint64, h ExpiredObjectH func iterateExpired(ctx context.Context, r pebble.Reader, epoch uint64, h ExpiredObjectHandler) error { prefix := []byte{expiredPrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index b1574f6f2..01b0e59f1 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -15,7 +15,7 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" ) // InhumePrm encapsulates parameters for Inhume operation. @@ -180,9 +180,19 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { res := InhumeRes{ inhumedByCnrID: make(map[cid.ID]ObjectCounters), } + + var containerIDs []cid.ID + if prm.tomb != nil { + containerIDs = append(containerIDs, prm.tomb.Container()) + } + for _, a := range prm.target { + containerIDs = append(containerIDs, a.Container()) + } + defer db.guard.LockContainerIDs(containerIDs)() + currEpoch := db.epochState.CurrentEpoch() - err := db.database.Update(func(tx *bbolt.Tx) error { - return db.inhumeTx(tx, currEpoch, prm, &res) + err := db.batch(func(b *pebble.Batch) error { + return db.inhumeTx(ctx, b, currEpoch, prm, &res) }) success = err == nil if success { @@ -195,48 +205,31 @@ func (db *DB) Inhume(ctx context.Context, prm InhumePrm) (InhumeRes, error) { return res, metaerr.Wrap(err) } -func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes) error { - garbageBKT := tx.Bucket(garbageBucketName) - graveyardBKT := tx.Bucket(graveyardBucketName) - - bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm) +func (db *DB) inhumeTx(ctx context.Context, b *pebble.Batch, epoch uint64, prm InhumePrm, res *InhumeRes) error { + keyer, value, err := getInhumeTargetBucketAndValue(b, prm) if err != nil { return err } - - buf := make([]byte, addressKeySize) for i := range prm.target { id := prm.target[i].Object() cnr := prm.target[i].Container() - - // prevent locked objects to be inhumed - if !prm.forceRemoval && objectLocked(tx, cnr, id) { - return new(apistatus.ObjectLocked) - } - var lockWasChecked bool - - // prevent lock objects to be inhumed - // if `Inhume` was called not with the - // `WithForceGCMark` option + var ecErr *objectSDK.ECInfoError if !prm.forceRemoval { - if isLockObject(tx, cnr, id) { - return ErrLockObjectRemoval + if err := checkNotLockerOrLocked(ctx, b, cnr, id); err != nil { + return err } - lockWasChecked = true } - obj, err := db.get(tx, prm.target[i], buf, false, true, epoch) - targetKey := addressKey(prm.target[i], buf) - var ecErr *objectSDK.ECInfoError + obj, err := get(ctx, b, prm.target[i], false, true, epoch) if err == nil { - err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res) + err = db.updateDeleteInfo(b, prm.target[i], obj, res) if err != nil { return err } } else if errors.As(err, &ecErr) { - err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value, targetKey) + err = db.inhumeECInfo(ctx, b, epoch, keyer, value, res, ecErr.ECInfo(), cnr) if err != nil { return err } @@ -244,18 +237,18 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes if prm.tomb != nil { var isTomb bool - isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey) + isTomb, err = markAsGC(b, prm.target[i]) if err != nil { return err } - if isTomb { continue } } // consider checking if target is already in graveyard? - err = bkt.Put(targetKey, value) + key := keyer(prm.target[i]) + err = b.Set(key, value, pebble.Sync) if err != nil { return err } @@ -268,22 +261,24 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes // the LOCK type continue } - - if isLockObject(tx, cnr, id) { + isLock, err := isLockObject(b, cnr, id) + if err != nil { + return err + } + if isLock { res.deletedLockObj = append(res.deletedLockObj, prm.target[i]) } } } - return db.applyInhumeResToCounters(tx, res) + return db.applyInhumeResToCounters(b, res) } -func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes, - garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket, - ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte, targetKey []byte, +func (db *DB) inhumeECInfo(ctx context.Context, b *pebble.Batch, epoch uint64, + keyer func(addr oid.Address) []byte, value []byte, + res *InhumeRes, ecInfo *objectSDK.ECInfo, cnr cid.ID, ) error { for _, chunk := range ecInfo.Chunks { - chunkBuf := make([]byte, addressKeySize) var chunkAddr oid.Address chunkAddr.SetContainer(cnr) var chunkID oid.ID @@ -292,22 +287,16 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I return err } chunkAddr.SetObject(chunkID) - chunkObj, err := db.get(tx, chunkAddr, chunkBuf, false, true, epoch) + chunkObj, err := get(ctx, b, chunkAddr, false, true, epoch) if err != nil { return err } - err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, chunkObj, res) + err = db.updateDeleteInfo(b, chunkAddr, chunkObj, res) if err != nil { return err } - chunkKey := addressKey(chunkAddr, chunkBuf) - if tomb != nil { - _, err = db.markAsGC(graveyardBKT, garbageBKT, chunkKey) - if err != nil { - return err - } - } - err = targetBucket.Put(chunkKey, value) + key := keyer(chunkAddr) + err = b.Set(key, value, pebble.Sync) if err != nil { return err } @@ -315,15 +304,38 @@ func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *I return nil } -func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { - if err := db.updateShardObjectCounter(tx, logical, res.LogicInhumed(), false); err != nil { +func checkNotLockerOrLocked(ctx context.Context, r pebble.Reader, cnr cid.ID, id oid.ID) error { + // prevent locked objects to be inhumed + locked, err := objectLocked(ctx, r, cnr, id) + if err != nil { return err } - if err := db.updateShardObjectCounter(tx, user, res.UserInhumed(), false); err != nil { + if locked { + return new(apistatus.ObjectLocked) + } + // prevent lock objects to be inhumed + // if `Inhume` was called not with the + // `WithForceGCMark` option + isLock, err := isLockObject(r, cnr, id) + if err != nil { return err } + if isLock { + return ErrLockObjectRemoval + } + return nil +} - return db.updateContainerCounter(tx, res.inhumedByCnrID, false) +func (db *DB) applyInhumeResToCounters(b *pebble.Batch, res *InhumeRes) error { + counters := make(map[cid.ID]objectCounterValue, len(res.inhumedByCnrID)) + for contID, inhumed := range res.inhumedByCnrID { + counters[contID] = objectCounterValue{ + Logic: -1 * int64(inhumed.Logic), + Phy: -1 * int64(inhumed.Phy), + User: -1 * int64(inhumed.User), + } + } + return updateContainerCounter(b, counters) } // getInhumeTargetBucketAndValue return target bucket to store inhume result and value that will be put in the bucket. @@ -336,35 +348,36 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error { // 1. tombstone address if Inhume was called with // a Tombstone // 2. zeroValue if Inhume was called with a GC mark -func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) { +func getInhumeTargetBucketAndValue(b *pebble.Batch, prm InhumePrm) (key func(addr oid.Address) []byte, value []byte, err error) { if prm.tomb != nil { - targetBucket = graveyardBKT - tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize)) - // it is forbidden to have a tomb-on-tomb in FrostFS, // so graveyard keys must not be addresses of tombstones - data := targetBucket.Get(tombKey) - if data != nil { - err := targetBucket.Delete(tombKey) + tombKey := graveyardKey(prm.tomb.Container(), prm.tomb.Object()) + v, err := valueSafe(b, tombKey) + if err != nil { + return nil, nil, err + } + if v != nil { + err := b.Delete(tombKey, pebble.Sync) if err != nil { return nil, nil, fmt.Errorf("could not remove grave with tombstone key: %w", err) } } - value = tombKey - } else { - targetBucket = garbageBKT - value = zeroValue + return func(addr oid.Address) []byte { + return graveyardKey(addr.Container(), addr.Object()) + }, encodeAddressToGrave(*prm.tomb), nil } - return targetBucket, value, nil + return func(addr oid.Address) []byte { + return garbageKey(addr.Container(), addr.Object()) + }, zeroValue, nil } -func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool, error) { - targetIsTomb, err := isTomb(graveyardBKT, key) +func markAsGC(b *pebble.Batch, addr oid.Address) (bool, error) { + targetIsTomb, err := isTomb(b, addr) if err != nil { return false, err } - // do not add grave if target is a tombstone if targetIsTomb { return true, nil @@ -372,19 +385,23 @@ func (db *DB) markAsGC(graveyardBKT, garbageBKT *bbolt.Bucket, key []byte) (bool // if tombstone appears object must be // additionally marked with GC - return false, garbageBKT.Put(key, zeroValue) + key := garbageKey(addr.Container(), addr.Object()) + return false, b.Set(key, zeroValue, pebble.Sync) } -func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Bucket, targetKey []byte, cnr cid.ID, obj *objectSDK.Object, res *InhumeRes) error { - containerID, _ := obj.ContainerID() - if inGraveyardWithKey(targetKey, graveyardBKT, garbageBKT) == 0 { - res.storeDeletionInfo(containerID, obj.PayloadSize(), IsUserObject(obj)) +func (db *DB) updateDeleteInfo(b *pebble.Batch, addr oid.Address, obj *objectSDK.Object, res *InhumeRes) error { + st, err := inGraveyardWithKey(b, addr) + if err != nil { + return err + } + if st == 0 { + res.storeDeletionInfo(addr.Container(), obj.PayloadSize(), IsUserObject(obj)) } // if object is stored, and it is regular object then update bucket // with container size estimations if obj.Type() == objectSDK.TypeRegular { - err := changeContainerSize(tx, cnr, obj.PayloadSize(), false) + err := changeContainerSize(b, addr.Container(), -1*int64(obj.PayloadSize())) if err != nil { return err } @@ -392,25 +409,30 @@ func (db *DB) updateDeleteInfo(tx *bbolt.Tx, garbageBKT, graveyardBKT *bbolt.Buc return nil } -func isTomb(graveyardBucket *bbolt.Bucket, key []byte) (bool, error) { +func isTomb(r pebble.Reader, addr oid.Address) (bool, error) { targetIsTomb := false + expectedValue := make([]byte, cidSize+objectKeySize) + addr.Container().Encode(expectedValue) + addr.Object().Encode(expectedValue[cidSize:]) + prefix := []byte{graveyardPrefix} + it, err := r.NewIter(&pebble.IterOptions{ + LowerBound: prefix, + }) + if err != nil { + return false, err + } // iterate over graveyard and check if target address // is the address of tombstone in graveyard. - err := graveyardBucket.ForEach(func(_, v []byte) error { - // check if graveyard has record with key corresponding - // to tombstone address (at least one) - targetIsTomb = bytes.Equal(v, key) + // check if graveyard has record with key corresponding + // to tombstone address (at least one) + for v := it.First(); v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() { + targetIsTomb = bytes.Equal(expectedValue, it.Value()) if targetIsTomb { // break bucket iterator - return errBreakBucketForEach + return true, it.Close() } - - return nil - }) - if err != nil && !errors.Is(err, errBreakBucketForEach) { - return false, err } - return targetIsTomb, nil + return false, it.Close() } diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go index 8386672d3..85e02346a 100644 --- a/pkg/local_object_storage/metabase/iterators.go +++ b/pkg/local_object_storage/metabase/iterators.go @@ -141,8 +141,7 @@ func iteratePhyObjects(r pebble.Reader, f func(cid.ID, oid.ID, *objectSDK.Object func iteratePhyObjectsWithPrefix(r pebble.Reader, typePrefix byte, f func(cid.ID, oid.ID, *objectSDK.Object) error) error { prefix := []byte{typePrefix} it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return err diff --git a/pkg/local_object_storage/metabase/pebble.go b/pkg/local_object_storage/metabase/pebble.go index 3f94289c1..b5ad3f10c 100644 --- a/pkg/local_object_storage/metabase/pebble.go +++ b/pkg/local_object_storage/metabase/pebble.go @@ -45,8 +45,7 @@ func (db *DB) snapshot(f func(*pebble.Snapshot) error) error { func selectByPrefixBatch(ctx context.Context, r pebble.Reader, prefix []byte, batchSize int) ([][]byte, error) { it, err := r.NewIter(&pebble.IterOptions{ - LowerBound: prefix, - OnlyReadGuaranteedDurable: true, + LowerBound: prefix, }) if err != nil { return nil, err