[#1412] metabase: Add search by indexed attributes

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-02 13:04:29 +03:00
parent be744ae3e6
commit 1efa64ee72
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
3 changed files with 169 additions and 39 deletions

View file

@ -37,8 +37,9 @@ type (
// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
cnr cid.ID
filters objectSDK.SearchFilters
cnr cid.ID
filters objectSDK.SearchFilters
useAttributeIndex bool
}
// SelectRes groups the resulting values of Select operation.
@ -56,6 +57,10 @@ func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) {
p.filters = fs
}
func (p *SelectPrm) SetUseAttributeIndex(v bool) {
p.useAttributeIndex = v
}
// AddressList returns list of addresses of the selected objects.
func (r SelectRes) AddressList() []oid.Address {
return r.addrList
@ -92,14 +97,14 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err
currEpoch := db.epochState.CurrentEpoch()
return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch)
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch, prm.useAttributeIndex)
success = err == nil
return err
}))
}
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) {
group, err := groupFilters(fs)
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64, useAttributeIndex bool) ([]oid.Address, error) {
group, err := groupFilters(fs, useAttributeIndex)
if err != nil {
return nil, err
}
@ -218,7 +223,13 @@ func (db *DB) selectFastFilter(
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
default:
default: // user attribute
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
if f.Operation() == objectSDK.MatchNotPresent {
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
} else {
db.selectFromFKBT(tx, bucketName, f, to, fNum)
}
}
}
@ -228,6 +239,15 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
v2object.TypeLock.String(): {bucketNameLockers},
}
func allBucketNames(cnr cid.ID) (names [][]byte) {
for _, fns := range mBucketNaming {
for _, fn := range fns {
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
}
}
return
}
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
appendNames := func(key string) {
fns, ok := mBucketNaming[key]
@ -259,6 +279,81 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str
return
}
func (db *DB) selectFromFKBT(
tx *bbolt.Tx,
name []byte, // fkbt root bucket name
f objectSDK.SearchFilter, // filter for operation and value
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
matchFunc, ok := db.matchers[f.Operation()]
if !ok {
db.log.Debug(logs.MetabaseMissingMatcher, zap.Stringer("operation", f.Operation()))
return
}
fkbtRoot := tx.Bucket(name)
if fkbtRoot == nil {
return
}
err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
fkbtLeaf := fkbtRoot.Bucket(k)
if fkbtLeaf == nil {
return nil
}
return fkbtLeaf.ForEach(func(k, _ []byte) error {
markAddressInCache(to, fNum, string(k))
return nil
})
})
if err != nil {
db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
}
}
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
// resulting cache.
func selectOutsideFKBT(
tx *bbolt.Tx,
incl [][]byte, // buckets
name []byte, // fkbt root bucket name
to map[string]int, // resulting cache
fNum int, // index of filter
) {
mExcl := make(map[string]struct{})
bktExcl := tx.Bucket(name)
if bktExcl != nil {
_ = bktExcl.ForEachBucket(func(k []byte) error {
exclBktLeaf := bktExcl.Bucket(k)
return exclBktLeaf.ForEach(func(k, _ []byte) error {
mExcl[string(k)] = struct{}{}
return nil
})
})
}
for i := range incl {
bktIncl := tx.Bucket(incl[i])
if bktIncl == nil {
continue
}
_ = bktIncl.ForEach(func(k, _ []byte) error {
if _, ok := mExcl[string(k)]; !ok {
markAddressInCache(to, fNum, string(k))
}
return nil
})
}
}
// selectFromList looks into <list> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromList(
@ -486,7 +581,7 @@ func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
// groupFilters divides filters in two groups: fast and slow. Fast filters
// processed by indexes and slow filters processed after by unmarshaling
// object headers.
func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filterGroup, error) {
res := filterGroup{
fastFilters: make(objectSDK.SearchFilters, 0, len(filters)),
slowFilters: make(objectSDK.SearchFilters, 0, len(filters)),
@ -511,7 +606,11 @@ func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
v2object.FilterPropertyPhy:
res.fastFilters = append(res.fastFilters, filters[i])
default:
res.slowFilters = append(res.slowFilters, filters[i])
if useAttributeIndex && IsAtrributeIndexed(filters[i].Header()) {
res.fastFilters = append(res.fastFilters, filters[i])
} else {
res.slowFilters = append(res.slowFilters, filters[i])
}
}
}