metabase: Cleanup ListWithCursor() a bit #1692

Closed
fyrchik wants to merge 4 commits from fyrchik/frostfs-node:meta-list-with-cursor into master
3 changed files with 20 additions and 19 deletions

View file

@ -14,10 +14,7 @@ type bucketCache struct {
} }
func newBucketCache() *bucketCache { func newBucketCache() *bucketCache {
return &bucketCache{ return &bucketCache{}
expired: make(map[cid.ID]*bbolt.Bucket),
primary: make(map[cid.ID]*bbolt.Bucket),
}
} }
func getLockedBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket { func getLockedBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
@ -56,7 +53,7 @@ func getExpiredBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
bucketName = objectToExpirationEpochBucketName(cnr, bucketName) bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
return tx.Bucket(bucketName) return tx.Bucket(bucketName)
} }
return getMappedBucket(bc.expired, tx, objectToExpirationEpochBucketName, cnr) return getMappedBucket(&bc.expired, tx, objectToExpirationEpochBucketName, cnr)
} }
func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket { func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
@ -65,17 +62,21 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
bucketName = primaryBucketName(cnr, bucketName) bucketName = primaryBucketName(cnr, bucketName)
return tx.Bucket(bucketName) return tx.Bucket(bucketName)
} }
return getMappedBucket(bc.primary, tx, primaryBucketName, cnr) return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
} }
func getMappedBucket(m map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket { func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
value, ok := m[cnr] value, ok := (*m)[cnr]
if ok { if ok {
return value return value
} }
if *m == nil {
*m = make(map[cid.ID]*bbolt.Bucket, 1)
}
bucketName := make([]byte, bucketKeySize) bucketName := make([]byte, bucketKeySize)
bucketName = nameFunc(cnr, bucketName) bucketName = nameFunc(cnr, bucketName)
m[cnr] = getBucket(&value, tx, bucketName) (*m)[cnr] = getBucket(&value, tx, bucketName)
return value return value
} }

View file

@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
var containerID cid.ID var containerID cid.ID
var offset []byte var offset []byte
graveyardBkt := tx.Bucket(graveyardBucketName) bc := newBucketCache()
garbageBkt := tx.Bucket(garbageBucketName)
rawAddr := make([]byte, cidSize, addressKeySize) rawAddr := make([]byte, cidSize, addressKeySize)
@ -169,7 +168,7 @@ loop:
bkt := tx.Bucket(name) bkt := tx.Bucket(name)
if bkt != nil { if bkt != nil {
copy(rawAddr, cidRaw) copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID, result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
result, count, cursor, threshold, currEpoch) result, count, cursor, threshold, currEpoch)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -204,9 +203,10 @@ loop:
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find // selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects. // object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket func selectNFromBucket(
bc *bucketCache,
bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID cnt cid.ID, // container ID
to []objectcore.Info, // listing result to []objectcore.Info, // listing result
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
cursor = new(Cursor) cursor = new(Cursor)
} }
count := len(to)
c := bkt.Cursor() c := bkt.Cursor()
k, v := c.First() k, v := c.First()
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
for ; k != nil; k, v = c.Next() { for ; k != nil; k, v = c.Next() {
if count >= limit { if len(to) >= limit {
break break
} }
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
offset = k offset = k
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
garbageBkt := getGarbageBucket(bc, bkt.Tx())
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue continue
} }
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
} }
expEpoch, hasExpEpoch := hasExpirationEpoch(&o) expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) { if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
continue continue
} }
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
a.SetContainer(cnt) a.SetContainer(cnt)
a.SetObject(obj) a.SetObject(obj)
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++
} }
return to, offset, cursor, nil return to, offset, cursor, nil

View file

@ -59,7 +59,7 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
for range b.N { for range b.N {
res, err := db.ListWithCursor(context.Background(), prm) res, err := db.ListWithCursor(context.Background(), prm)
if err != nil { if err != nil {
if errors.Is(err, meta.ErrEndOfListing) { if !errors.Is(err, meta.ErrEndOfListing) {
b.Fatalf("error: %v", err) b.Fatalf("error: %v", err)
} }
prm.SetCursor(nil) prm.SetCursor(nil)