From a8d76f2ebb0fe24385950976ed0e1fa6704e94aa Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Fri, 27 Nov 2020 12:01:44 +0300 Subject: [PATCH] [#199] Implement ObjectID selection filter in metabase Signed-off-by: Alex Vanin --- .../metabase/v2/select.go | 37 ++++++++++- .../metabase/v2/select_test.go | 64 ++++++++++++++++++- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/pkg/local_object_storage/metabase/v2/select.go b/pkg/local_object_storage/metabase/v2/select.go index d549addb..ec074244 100644 --- a/pkg/local_object_storage/metabase/v2/select.go +++ b/pkg/local_object_storage/metabase/v2/select.go @@ -131,7 +131,7 @@ func (db *DB) selectFastFilter( switch f.Header() { case v2object.FilterHeaderObjectID: - // todo: implement me + db.selectObjectID(tx, f, prefix, to, fNum) case v2object.FilterHeaderOwnerID: bucketName := ownerBucketName(cid) db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) @@ -252,6 +252,41 @@ func (db *DB) selectFromList( } } +// selectObjectID processes objectID filter with in-place optimizations. +func (db *DB) selectObjectID( + tx *bbolt.Tx, + f object.SearchFilter, + prefix string, + to map[string]int, // resulting cache + fNum int, // index of filter +) { + switch f.Operation() { + case object.MatchStringEqual: + default: + db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation()))) + } + + // warning: it is in-place optimization and works only for MatchStringEQ, + // for NotEQ you should iterate over bkt and apply matchFunc + + addrStr := prefix + f.Value() + addr := object.NewAddress() + + err := addr.Parse(addrStr) + if err != nil { + db.log.Debug("can't decode object id address", + zap.String("addr", addrStr), + zap.String("error", err.Error())) + + return + } + + ok, err := db.exists(tx, addr) + if err == nil && ok { + markAddressInCache(to, fNum, addrStr) + } +} + // matchSlowFilters return true if object header is matched by all slow filters. func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.SearchFilters) bool { if len(f) == 0 { diff --git a/pkg/local_object_storage/metabase/v2/select_test.go b/pkg/local_object_storage/metabase/v2/select_test.go index 6526d351..4d3b29a9 100644 --- a/pkg/local_object_storage/metabase/v2/select_test.go +++ b/pkg/local_object_storage/metabase/v2/select_test.go @@ -107,9 +107,6 @@ func TestDB_SelectRootPhyParent(t *testing.T) { err = db.Put(link.Object(), nil) require.NoError(t, err) - // printDB(meta.ExtractDB(db)) - // return - t.Run("root objects", func(t *testing.T) { fs := generateSearchFilter(cid) fs.AddRootFilter() @@ -300,3 +297,64 @@ func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters { return fs } + +func TestDB_SelectObjectID(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + // prepare + + parent := generateRawObjectWithCID(t, cid) + + regular := generateRawObjectWithCID(t, cid) + regular.SetParentID(parent.ID()) + regular.SetParent(parent.Object().SDK()) + + err := db.Put(regular.Object(), nil) + require.NoError(t, err) + + ts := generateRawObjectWithCID(t, cid) + ts.SetType(objectSDK.TypeTombstone) + err = db.Put(ts.Object(), nil) + require.NoError(t, err) + + sg := generateRawObjectWithCID(t, cid) + sg.SetType(objectSDK.TypeStorageGroup) + err = db.Put(sg.Object(), nil) + require.NoError(t, err) + + t.Run("not found objects", func(t *testing.T) { + raw := generateRawObjectWithCID(t, cid) + + fs := generateSearchFilter(cid) + fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID()) + + testSelect(t, db, fs) + }) + + t.Run("regular objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID()) + testSelect(t, db, fs, regular.Object().Address()) + }) + + t.Run("tombstone objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID()) + testSelect(t, db, fs, ts.Object().Address()) + }) + + t.Run("storage group objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID()) + testSelect(t, db, fs, sg.Object().Address()) + }) + + t.Run("storage group objects", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID()) + testSelect(t, db, fs, parent.Object().Address()) + }) +}