metabase: Do not store root info for small objects #1007
2 changed files with 33 additions and 13 deletions
|
@ -248,19 +248,11 @@ func putUniqueIndexes(
|
||||||
}
|
}
|
||||||
|
|
||||||
// index root object
|
// index root object
|
||||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() && isParent {
|
||||||
var (
|
splitInfo, err := si.Marshal()
|
||||||
err error
|
|
||||||
splitInfo []byte
|
|
||||||
)
|
|
||||||
|
|
||||||
if isParent {
|
|
||||||
splitInfo, err = si.Marshal()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't marshal split info: %w", err)
|
return fmt.Errorf("can't marshal split info: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||||
name: rootBucketName(cnr, bucketName),
|
name: rootBucketName(cnr, bucketName),
|
||||||
key: objKey,
|
key: objKey,
|
||||||
|
|
|
@ -181,6 +181,34 @@ func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||||
|
// resulting cache. Keys should be stringed object ids.
|
||||||
|
func selectRootObjects(tx *bbolt.Tx, cnr cid.ID, to map[string]int, fNum int) {
|
||||||
|
rootBkt := tx.Bucket(rootBucketName(cnr, make([]byte, bucketKeySize)))
|
||||||
|
if rootBkt != nil {
|
||||||
|
_ = rootBkt.ForEach(func(k, _ []byte) error {
|
||||||
|
markAddressInCache(to, fNum, string(k))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
primaryBkt := tx.Bucket(primaryBucketName(cnr, make([]byte, bucketKeySize)))
|
||||||
|
if primaryBkt == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = primaryBkt.ForEach(func(objectID, rawObject []byte) error {
|
||||||
|
obj := objectSDK.New()
|
||||||
|
if obj.Unmarshal(rawObject) != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !obj.HasParent() {
|
||||||
|
markAddressInCache(to, fNum, string(objectID))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// selectFastFilter makes fast optimized checks for well known buckets or
|
// selectFastFilter makes fast optimized checks for well known buckets or
|
||||||
// looking through user attribute buckets otherwise.
|
// looking through user attribute buckets otherwise.
|
||||||
func (db *DB) selectFastFilter(
|
func (db *DB) selectFastFilter(
|
||||||
|
@ -212,7 +240,7 @@ func (db *DB) selectFastFilter(
|
||||||
bucketName := splitBucketName(cnr, bucketName)
|
bucketName := splitBucketName(cnr, bucketName)
|
||||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||||
case v2object.FilterPropertyRoot:
|
case v2object.FilterPropertyRoot:
|
||||||
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
selectRootObjects(tx, cnr, to, fNum)
|
||||||
case v2object.FilterPropertyPhy:
|
case v2object.FilterPropertyPhy:
|
||||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||||
|
|
Loading…
Reference in a new issue