diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 1863fc25e..4e6ed6083 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -150,11 +150,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters continue // ignore removed objects } - if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) { + if addr, match := db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch); !match { continue // ignore objects with unmatched slow filters + } else { + res = append(res, addr) } - - res = append(res, addr) } return res, nil @@ -382,15 +382,18 @@ func (db *DB) selectObjectID( } // matchSlowFilters return true if object header is matched by all slow filters. -func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool { - if len(f) == 0 { - return true - } - +func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) (oid.Address, bool) { + result := addr buf := make([]byte, addressKeySize) obj, err := db.get(tx, addr, buf, true, false, currEpoch) if err != nil { - return false + return result, false + } + if ech := obj.ECHeader(); ech != nil { + result.SetObject(ech.Parent()) + } + if len(f) == 0 { + return result, true } for i := range f { @@ -417,21 +420,21 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc if ok { data = []byte(v) } else { - return f[i].Operation() == objectSDK.MatchNotPresent + return result, f[i].Operation() == objectSDK.MatchNotPresent } } matchFunc, ok := db.matchers[f[i].Operation()] if !ok { - return false + return result, false } if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) { - return false + return result, false } } - return true + return result, true } func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 0fab3a108..fca2b110e 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -70,6 +70,22 @@ func TestDB_SelectUserAttributes(t *testing.T) { err = putBig(db, raw6) require.NoError(t, err) + raw7 := testutil.GenerateObjectWithCID(cnr) + var attr objectSDK.Attribute + attr.SetKey("path") + attr.SetValue("test/3/4") + attrs := raw7.Attributes() + attrs = append(attrs, attr) + ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ + ID: oidtest.ID(), + Attributes: attrs, + }, 0, 3, []byte{}, 0) + raw7.SetECHeader(ech) + require.NoError(t, putBig(db, raw7)) + var raw7Parent oid.Address + raw7Parent.SetContainer(cnr) + raw7Parent.SetObject(ech.Parent()) + fs := objectSDK.SearchFilters{} fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual) testSelect(t, db, cnr, fs, @@ -100,6 +116,7 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), object.AddressOf(raw6), + raw7Parent, ) fs = objectSDK.SearchFilters{} @@ -110,6 +127,7 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), object.AddressOf(raw6), + raw7Parent, ) fs = objectSDK.SearchFilters{} @@ -120,6 +138,7 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), object.AddressOf(raw6), + raw7Parent, ) fs = objectSDK.SearchFilters{} @@ -131,6 +150,7 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), object.AddressOf(raw6), + raw7Parent, ) fs = objectSDK.SearchFilters{} @@ -139,6 +159,7 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), object.AddressOf(raw6), + raw7Parent, ) fs = objectSDK.SearchFilters{} @@ -147,6 +168,12 @@ func TestDB_SelectUserAttributes(t *testing.T) { object.AddressOf(raw4), object.AddressOf(raw5), ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter("path", "test/3/4", objectSDK.MatchStringEqual) + testSelect(t, db, cnr, fs, + raw7Parent, + ) } func TestDB_SelectRootPhyParent(t *testing.T) {