metabase: Do not store root info for small objects #1007
2 changed files with 33 additions and 13 deletions
|
@ -248,19 +248,11 @@ func putUniqueIndexes(
|
|||
}
|
||||
|
||||
// index root object
|
||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||
var (
|
||||
err error
|
||||
splitInfo []byte
|
||||
)
|
||||
|
||||
if isParent {
|
||||
splitInfo, err = si.Marshal()
|
||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() && isParent {
|
||||
splitInfo, err := si.Marshal()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't marshal split info: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
||||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
|
|
|
@ -181,6 +181,34 @@ func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int)
|
|||
})
|
||||
}
|
||||
|
||||
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||
// resulting cache. Keys should be stringed object ids.
|
||||
func selectRootObjects(tx *bbolt.Tx, cnr cid.ID, to map[string]int, fNum int) {
|
||||
rootBkt := tx.Bucket(rootBucketName(cnr, make([]byte, bucketKeySize)))
|
||||
if rootBkt != nil {
|
||||
_ = rootBkt.ForEach(func(k, _ []byte) error {
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
primaryBkt := tx.Bucket(primaryBucketName(cnr, make([]byte, bucketKeySize)))
|
||||
if primaryBkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = primaryBkt.ForEach(func(objectID, rawObject []byte) error {
|
||||
obj := objectSDK.New()
|
||||
if obj.Unmarshal(rawObject) != nil {
|
||||
|
||||
return nil
|
||||
}
|
||||
if !obj.HasParent() {
|
||||
markAddressInCache(to, fNum, string(objectID))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// selectFastFilter makes fast optimized checks for well known buckets or
|
||||
// looking through user attribute buckets otherwise.
|
||||
func (db *DB) selectFastFilter(
|
||||
|
@ -212,7 +240,7 @@ func (db *DB) selectFastFilter(
|
|||
bucketName := splitBucketName(cnr, bucketName)
|
||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterPropertyRoot:
|
||||
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
||||
selectRootObjects(tx, cnr, to, fNum)
|
||||
case v2object.FilterPropertyPhy:
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||
|
|
Loading…
Reference in a new issue
Unmarshaling each object can be very costly, is less expensive check currently possible? (ideally checking whether some key is present).
In theory, we could check field presence from the byte-slice directly, though.
Splitting primary bucket into 2 is also an option.
I haven't found.
As we have the general way to drop all the indexes I think it is ok to have such
search
implementation.