[#1347] metabase: Fix EC search
Some checks failed
DCO action / DCO (pull_request) Successful in 2m14s
Tests and linters / Run gofumpt (pull_request) Successful in 2m42s
Vulncheck / Vulncheck (pull_request) Successful in 2m49s
Tests and linters / Lint (pull_request) Failing after 2m57s
Build / Build Components (pull_request) Successful in 4m14s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m15s
Tests and linters / gopls check (pull_request) Successful in 4m39s
Tests and linters / Tests (pull_request) Failing after 5m3s
Tests and linters / Staticcheck (pull_request) Successful in 5m0s
Tests and linters / Tests with -race (pull_request) Failing after 5m7s

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-09-12 09:53:21 +03:00
parent ec8da40567
commit 77128f1e6b
2 changed files with 43 additions and 13 deletions

View file

@ -150,11 +150,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters
continue // ignore removed objects continue // ignore removed objects
} }
if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) { if addr, match := db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch); !match {
continue // ignore objects with unmatched slow filters continue // ignore objects with unmatched slow filters
} else {
res = append(res, addr)
} }
res = append(res, addr)
} }
return res, nil return res, nil
@ -382,15 +382,18 @@ func (db *DB) selectObjectID(
} }
// matchSlowFilters return true if object header is matched by all slow filters. // matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool { func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) (oid.Address, bool) {
if len(f) == 0 { result := addr
return true
}
buf := make([]byte, addressKeySize) buf := make([]byte, addressKeySize)
obj, err := db.get(tx, addr, buf, true, false, currEpoch) obj, err := db.get(tx, addr, buf, true, false, currEpoch)
if err != nil { if err != nil {
return false return result, false
}
if ech := obj.ECHeader(); ech != nil {
result.SetObject(ech.Parent())
}
if len(f) == 0 {
return result, true
} }
for i := range f { for i := range f {
@ -417,21 +420,21 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
if ok { if ok {
data = []byte(v) data = []byte(v)
} else { } else {
return f[i].Operation() == objectSDK.MatchNotPresent return result, f[i].Operation() == objectSDK.MatchNotPresent
} }
} }
matchFunc, ok := db.matchers[f[i].Operation()] matchFunc, ok := db.matchers[f[i].Operation()]
if !ok { if !ok {
return false return result, false
} }
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) { if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
return false return result, false
} }
} }
return true return result, true
} }
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {

View file

@ -70,6 +70,22 @@ func TestDB_SelectUserAttributes(t *testing.T) {
err = putBig(db, raw6) err = putBig(db, raw6)
require.NoError(t, err) require.NoError(t, err)
raw7 := testutil.GenerateObjectWithCID(cnr)
var attr objectSDK.Attribute
attr.SetKey("path")
attr.SetValue("test/3/4")
attrs := raw7.Attributes()
attrs = append(attrs, attr)
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{
ID: oidtest.ID(),
Attributes: attrs,
}, 0, 3, []byte{}, 0)
raw7.SetECHeader(ech)
require.NoError(t, putBig(db, raw7))
var raw7Parent oid.Address
raw7Parent.SetContainer(cnr)
raw7Parent.SetObject(ech.Parent())
fs := objectSDK.SearchFilters{} fs := objectSDK.SearchFilters{}
fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual) fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs, testSelect(t, db, cnr, fs,
@ -100,6 +116,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -110,6 +127,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -120,6 +138,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -131,6 +150,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -139,6 +159,7 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
object.AddressOf(raw6), object.AddressOf(raw6),
raw7Parent,
) )
fs = objectSDK.SearchFilters{} fs = objectSDK.SearchFilters{}
@ -147,6 +168,12 @@ func TestDB_SelectUserAttributes(t *testing.T) {
object.AddressOf(raw4), object.AddressOf(raw4),
object.AddressOf(raw5), object.AddressOf(raw5),
) )
fs = objectSDK.SearchFilters{}
fs.AddFilter("path", "test/3/4", objectSDK.MatchStringEqual)
testSelect(t, db, cnr, fs,
raw7Parent,
)
} }
func TestDB_SelectRootPhyParent(t *testing.T) { func TestDB_SelectRootPhyParent(t *testing.T) {