From 62bd22a379d4cd60a5a744bf7127a562326cc4ca Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 3 Nov 2020 17:24:23 +0300 Subject: [PATCH] [#142] metabase: Fix false-positive select in absence of filtered header Fix a bug in the selection when the object without some filtered header added to the final result. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/select.go | 34 ++++++++++--------- .../metabase/select_test.go | 32 +++++++++++++++++ 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index c6c05101..741cba8a 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -45,16 +45,17 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { }) } - // keep processed addresses (false if address was added and excluded later) - mAddr := make(map[string]bool) + // keep processed addresses + // value equal to number (index+1) of latest matched filter + mAddr := make(map[string]int) - for _, f := range fs { - matchFunc, ok := db.matchers[f.Operation()] + for fNum := range fs { + matchFunc, ok := db.matchers[fs[fNum].Operation()] if !ok { - return errors.Errorf("no function for matcher %v", f.Operation()) + return errors.Errorf("no function for matcher %v", fs[fNum].Operation()) } - key := f.Header() + key := fs[fNum].Header() // get bucket with values keyBucket := indexBucket.Bucket([]byte(key)) @@ -63,7 +64,7 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { return nil } - fVal := f.Value() + fVal := fs[fNum].Value() // iterate over all existing values for the key if err := keyBucket.ForEach(func(k, v []byte) error { @@ -74,13 +75,12 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { return errors.Wrapf(err, "(%T) could not decode address list", db) } - for i := range strs { - if include { - if _, ok := mAddr[strs[i]]; !ok { - mAddr[strs[i]] = true - } - } else { - mAddr[strs[i]] = false + for j := range strs { + if num := mAddr[strs[j]]; num != fNum { + // than object does not match some previous filter + continue + } else if include { + mAddr[strs[j]] = fNum + 1 } } @@ -90,8 +90,10 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { } } - for a, inc := range mAddr { - if !inc { + fLen := len(fs) + + for a, ind := range mAddr { + if ind != fLen { continue } diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index ef3e574e..6362c35d 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -152,3 +152,35 @@ func TestSelectRemoved(t *testing.T) { testSelect(t, db, fs, obj2.Address()) } + +func TestMissingObjectAttribute(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + // add object w/o attribute + obj1 := generateObject(t, testPrm{ + attrNum: 1, + }) + + // add object w/o attribute + obj2 := generateObject(t, testPrm{}) + + a1 := obj1.GetAttributes()[0] + + // add common attribute + aCommon := addCommonAttribute(obj1, obj2) + + // write to DB + require.NoError(t, db.Put(obj1)) + require.NoError(t, db.Put(obj2)) + + fs := objectSDK.SearchFilters{} + + // 1st filter by common attribute + fs.AddFilter(aCommon.GetKey(), aCommon.GetValue(), objectSDK.MatchStringEqual) + + // next filter by attribute from 1st object only + fs.AddFilter(a1.GetKey(), a1.GetValue(), objectSDK.MatchStringEqual) + + testSelect(t, db, fs, obj1.Address()) +}