[#142] metabase: Fix false-positive select in absence of filtered header

Fix a bug in the selection when the object without some filtered header
added to the final result.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2020-11-03 17:24:23 +03:00 committed by Alex Vanin
parent 3c39c5a90c
commit 62bd22a379
2 changed files with 50 additions and 16 deletions

View file

@ -45,16 +45,17 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
}) })
} }
// keep processed addresses (false if address was added and excluded later) // keep processed addresses
mAddr := make(map[string]bool) // value equal to number (index+1) of latest matched filter
mAddr := make(map[string]int)
for _, f := range fs { for fNum := range fs {
matchFunc, ok := db.matchers[f.Operation()] matchFunc, ok := db.matchers[fs[fNum].Operation()]
if !ok { if !ok {
return errors.Errorf("no function for matcher %v", f.Operation()) return errors.Errorf("no function for matcher %v", fs[fNum].Operation())
} }
key := f.Header() key := fs[fNum].Header()
// get bucket with values // get bucket with values
keyBucket := indexBucket.Bucket([]byte(key)) keyBucket := indexBucket.Bucket([]byte(key))
@ -63,7 +64,7 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
return nil return nil
} }
fVal := f.Value() fVal := fs[fNum].Value()
// iterate over all existing values for the key // iterate over all existing values for the key
if err := keyBucket.ForEach(func(k, v []byte) error { if err := keyBucket.ForEach(func(k, v []byte) error {
@ -74,13 +75,12 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
return errors.Wrapf(err, "(%T) could not decode address list", db) return errors.Wrapf(err, "(%T) could not decode address list", db)
} }
for i := range strs { for j := range strs {
if include { if num := mAddr[strs[j]]; num != fNum {
if _, ok := mAddr[strs[i]]; !ok { // than object does not match some previous filter
mAddr[strs[i]] = true continue
} } else if include {
} else { mAddr[strs[j]] = fNum + 1
mAddr[strs[i]] = false
} }
} }
@ -90,8 +90,10 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
} }
} }
for a, inc := range mAddr { fLen := len(fs)
if !inc {
for a, ind := range mAddr {
if ind != fLen {
continue continue
} }

View file

@ -152,3 +152,35 @@ func TestSelectRemoved(t *testing.T) {
testSelect(t, db, fs, obj2.Address()) testSelect(t, db, fs, obj2.Address())
} }
func TestMissingObjectAttribute(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
// add object w/o attribute
obj1 := generateObject(t, testPrm{
attrNum: 1,
})
// add object w/o attribute
obj2 := generateObject(t, testPrm{})
a1 := obj1.GetAttributes()[0]
// add common attribute
aCommon := addCommonAttribute(obj1, obj2)
// write to DB
require.NoError(t, db.Put(obj1))
require.NoError(t, db.Put(obj2))
fs := objectSDK.SearchFilters{}
// 1st filter by common attribute
fs.AddFilter(aCommon.GetKey(), aCommon.GetValue(), objectSDK.MatchStringEqual)
// next filter by attribute from 1st object only
fs.AddFilter(a1.GetKey(), a1.GetValue(), objectSDK.MatchStringEqual)
testSelect(t, db, fs, obj1.Address())
}