[#199] Implement ObjectID selection filter in metabase

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-11-27 12:01:44 +03:00
parent 700bd7de01
commit a8d76f2ebb
2 changed files with 97 additions and 4 deletions

View file

@ -131,7 +131,7 @@ func (db *DB) selectFastFilter(
switch f.Header() {
case v2object.FilterHeaderObjectID:
// todo: implement me
db.selectObjectID(tx, f, prefix, to, fNum)
case v2object.FilterHeaderOwnerID:
bucketName := ownerBucketName(cid)
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
@ -252,6 +252,41 @@ func (db *DB) selectFromList(
}
}
// selectObjectID processes objectID filter with in-place optimizations.
func (db *DB) selectObjectID(
tx *bbolt.Tx,
f object.SearchFilter,
prefix string,
to map[string]int, // resulting cache
fNum int, // index of filter
) {
switch f.Operation() {
case object.MatchStringEqual:
default:
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation())))
}
// warning: it is in-place optimization and works only for MatchStringEQ,
// for NotEQ you should iterate over bkt and apply matchFunc
addrStr := prefix + f.Value()
addr := object.NewAddress()
err := addr.Parse(addrStr)
if err != nil {
db.log.Debug("can't decode object id address",
zap.String("addr", addrStr),
zap.String("error", err.Error()))
return
}
ok, err := db.exists(tx, addr)
if err == nil && ok {
markAddressInCache(to, fNum, addrStr)
}
}
// matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.SearchFilters) bool {
if len(f) == 0 {

View file

@ -107,9 +107,6 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
err = db.Put(link.Object(), nil)
require.NoError(t, err)
// printDB(meta.ExtractDB(db))
// return
t.Run("root objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddRootFilter()
@ -300,3 +297,64 @@ func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters {
return fs
}
func TestDB_SelectObjectID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := testCID()
// prepare
parent := generateRawObjectWithCID(t, cid)
regular := generateRawObjectWithCID(t, cid)
regular.SetParentID(parent.ID())
regular.SetParent(parent.Object().SDK())
err := db.Put(regular.Object(), nil)
require.NoError(t, err)
ts := generateRawObjectWithCID(t, cid)
ts.SetType(objectSDK.TypeTombstone)
err = db.Put(ts.Object(), nil)
require.NoError(t, err)
sg := generateRawObjectWithCID(t, cid)
sg.SetType(objectSDK.TypeStorageGroup)
err = db.Put(sg.Object(), nil)
require.NoError(t, err)
t.Run("not found objects", func(t *testing.T) {
raw := generateRawObjectWithCID(t, cid)
fs := generateSearchFilter(cid)
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID())
testSelect(t, db, fs)
})
t.Run("regular objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID())
testSelect(t, db, fs, regular.Object().Address())
})
t.Run("tombstone objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID())
testSelect(t, db, fs, ts.Object().Address())
})
t.Run("storage group objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID())
testSelect(t, db, fs, sg.Object().Address())
})
t.Run("storage group objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID())
testSelect(t, db, fs, parent.Object().Address())
})
}