metabase: Cleanup ListWithCursor()
a bit #1692
3 changed files with 20 additions and 19 deletions
|
@ -14,10 +14,7 @@ type bucketCache struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBucketCache() *bucketCache {
|
func newBucketCache() *bucketCache {
|
||||||
return &bucketCache{
|
return &bucketCache{}
|
||||||
expired: make(map[cid.ID]*bbolt.Bucket),
|
|
||||||
primary: make(map[cid.ID]*bbolt.Bucket),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLockedBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
func getLockedBucket(bc *bucketCache, tx *bbolt.Tx) *bbolt.Bucket {
|
||||||
|
@ -56,7 +53,7 @@ func getExpiredBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
|
bucketName = objectToExpirationEpochBucketName(cnr, bucketName)
|
||||||
return tx.Bucket(bucketName)
|
return tx.Bucket(bucketName)
|
||||||
}
|
}
|
||||||
return getMappedBucket(bc.expired, tx, objectToExpirationEpochBucketName, cnr)
|
return getMappedBucket(&bc.expired, tx, objectToExpirationEpochBucketName, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
|
@ -65,17 +62,21 @@ func getPrimaryBucket(bc *bucketCache, tx *bbolt.Tx, cnr cid.ID) *bbolt.Bucket {
|
||||||
bucketName = primaryBucketName(cnr, bucketName)
|
bucketName = primaryBucketName(cnr, bucketName)
|
||||||
return tx.Bucket(bucketName)
|
return tx.Bucket(bucketName)
|
||||||
}
|
}
|
||||||
return getMappedBucket(bc.primary, tx, primaryBucketName, cnr)
|
return getMappedBucket(&bc.primary, tx, primaryBucketName, cnr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMappedBucket(m map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
|
func getMappedBucket(m *map[cid.ID]*bbolt.Bucket, tx *bbolt.Tx, nameFunc func(cid.ID, []byte) []byte, cnr cid.ID) *bbolt.Bucket {
|
||||||
value, ok := m[cnr]
|
value, ok := (*m)[cnr]
|
||||||
if ok {
|
if ok {
|
||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if *m == nil {
|
||||||
|
*m = make(map[cid.ID]*bbolt.Bucket, 1)
|
||||||
|
}
|
||||||
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
bucketName = nameFunc(cnr, bucketName)
|
bucketName = nameFunc(cnr, bucketName)
|
||||||
m[cnr] = getBucket(&value, tx, bucketName)
|
(*m)[cnr] = getBucket(&value, tx, bucketName)
|
||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,8 +139,7 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int,
|
||||||
|
|
||||||
var containerID cid.ID
|
var containerID cid.ID
|
||||||
var offset []byte
|
var offset []byte
|
||||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
bc := newBucketCache()
|
||||||
garbageBkt := tx.Bucket(garbageBucketName)
|
|
||||||
|
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
|
||||||
|
@ -169,7 +168,7 @@ loop:
|
||||||
bkt := tx.Bucket(name)
|
bkt := tx.Bucket(name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
copy(rawAddr, cidRaw)
|
copy(rawAddr, cidRaw)
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
result, offset, cursor, err = selectNFromBucket(bc, bkt, objType, rawAddr, containerID,
|
||||||
result, count, cursor, threshold, currEpoch)
|
result, count, cursor, threshold, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -204,9 +203,10 @@ loop:
|
||||||
|
|
||||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
||||||
// object to start selecting from. Ignores inhumed objects.
|
// object to start selecting from. Ignores inhumed objects.
|
||||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
func selectNFromBucket(
|
||||||
|
bc *bucketCache,
|
||||||
|
bkt *bbolt.Bucket, // main bucket
|
||||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
cnt cid.ID, // container ID
|
cnt cid.ID, // container ID
|
||||||
to []objectcore.Info, // listing result
|
to []objectcore.Info, // listing result
|
||||||
|
@ -219,7 +219,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
cursor = new(Cursor)
|
cursor = new(Cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
count := len(to)
|
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, v := c.First()
|
k, v := c.First()
|
||||||
|
|
||||||
|
@ -231,7 +230,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
for ; k != nil; k, v = c.Next() {
|
||||||
if count >= limit {
|
if len(to) >= limit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -241,6 +240,8 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
offset = k
|
offset = k
|
||||||
|
graveyardBkt := getGraveyardBucket(bc, bkt.Tx())
|
||||||
|
garbageBkt := getGarbageBucket(bc, bkt.Tx())
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -251,7 +252,7 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
}
|
}
|
||||||
|
|
||||||
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
expEpoch, hasExpEpoch := hasExpirationEpoch(&o)
|
||||||
if hasExpEpoch && expEpoch < currEpoch && !objectLocked(bkt.Tx(), cnt, obj) {
|
if hasExpEpoch && expEpoch < currEpoch && !objectLockedWithCache(bc, bkt.Tx(), cnt, obj) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +274,6 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
||||||
a.SetContainer(cnt)
|
a.SetContainer(cnt)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
count++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return to, offset, cursor, nil
|
return to, offset, cursor, nil
|
||||||
|
|
|
@ -59,7 +59,7 @@ func benchmarkListWithCursor(b *testing.B, db *meta.DB, batchSize int) {
|
||||||
for range b.N {
|
for range b.N {
|
||||||
res, err := db.ListWithCursor(context.Background(), prm)
|
res, err := db.ListWithCursor(context.Background(), prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, meta.ErrEndOfListing) {
|
if !errors.Is(err, meta.ErrEndOfListing) {
|
||||||
b.Fatalf("error: %v", err)
|
b.Fatalf("error: %v", err)
|
||||||
}
|
}
|
||||||
prm.SetCursor(nil)
|
prm.SetCursor(nil)
|
||||||
|
|
Loading…
Add table
Reference in a new issue