From 200fdbd361f8eadae6004afd09ab621dc6802e5e Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Fri, 6 Nov 2020 12:41:59 +0300 Subject: [PATCH] [#149] metabase: Do not write virtual objects to the primary index In the previous implementation of the metabase, it was necessary to write virtual objects to the primary index to be able to select them. In this approach, virtual objects can be obtained directly using Head operation. This has a side effect in handling object operations that do not expect to receive a virtual object header in a single operation. With recent changes, it is no longer necessary to have records of virtual objects in the primary index, so this no longer happens for system integrity. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/db_test.go | 43 ++++ pkg/local_object_storage/metabase/put.go | 8 +- pkg/local_object_storage/metabase/select.go | 217 ++++++++++--------- 3 files changed, 159 insertions(+), 109 deletions(-) diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index 808d0f58..a4206263 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -13,6 +13,7 @@ import ( v2object "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/util/test" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" ) @@ -260,3 +261,45 @@ func TestSelectNonExistentAttributes(t *testing.T) { require.NoError(t, err) require.Empty(t, res) } + +func TestVirtualObject(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + // create object with parent + obj := generateObject(t, testPrm{ + withParent: true, + }) + + require.NoError(t, db.Put(obj)) + + childAddr := obj.Address() + parAddr := obj.GetParent().Address() + + // child object must be readable + _, err := db.Get(childAddr) + require.NoError(t, err) + + // parent object must not be readable + _, err = db.Get(parAddr) + require.True(t, errors.Is(err, errNotFound)) + + fs := objectSDK.SearchFilters{} + + // both objects should appear in selection + testSelect(t, db, fs, childAddr, parAddr) + + // filter leaves + fs.AddLeafFilter() + + // only child object should appear + testSelect(t, db, fs, childAddr) + + fs = fs[:0] + + // filter non-leaf objects + fs.AddNonLeafFilter() + + // only parent object should appear + testSelect(t, db, fs, parAddr) +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index a08d32b4..7aaa0892 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -41,9 +41,11 @@ func (db *DB) Put(obj *object.Object) error { addrKey := addressKey(obj.Address()) - // put header to primary bucket - if err := primaryBucket.Put(addrKey, data); err != nil { - return errors.Wrapf(err, "(%T) could not put item to primary bucket", db) + if !par { + // put header to primary bucket + if err := primaryBucket.Put(addrKey, data); err != nil { + return errors.Wrapf(err, "(%T) could not put item to primary bucket", db) + } } // create bucket for indices diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 741cba8a..a285ed53 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -7,112 +7,117 @@ import ( ) // Select returns list of addresses of objects that match search filters. -func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { - res := make([]*object.Address, 0) - - err := db.boltDB.View(func(tx *bbolt.Tx) error { - // get indexed bucket - indexBucket := tx.Bucket(indexBucket) - if indexBucket == nil { - // empty storage - return nil - } - - if len(fs) == 0 { - // get primary bucket - primaryBucket := tx.Bucket(primaryBucket) - if primaryBucket == nil { - // empty storage - return nil - } - - // iterate over all stored addresses - return primaryBucket.ForEach(func(k, v []byte) error { - // check if object marked as deleted - if objectRemoved(tx, k) { - return nil - } - - addr := object.NewAddress() - if err := addr.Parse(string(k)); err != nil { - // TODO: storage was broken, so we need to handle it - return err - } - - res = append(res, addr) - - return nil - }) - } - - // keep processed addresses - // value equal to number (index+1) of latest matched filter - mAddr := make(map[string]int) - - for fNum := range fs { - matchFunc, ok := db.matchers[fs[fNum].Operation()] - if !ok { - return errors.Errorf("no function for matcher %v", fs[fNum].Operation()) - } - - key := fs[fNum].Header() - - // get bucket with values - keyBucket := indexBucket.Bucket([]byte(key)) - if keyBucket == nil { - // no object has this attribute => empty result - return nil - } - - fVal := fs[fNum].Value() - - // iterate over all existing values for the key - if err := keyBucket.ForEach(func(k, v []byte) error { - include := matchFunc(string(key), string(cutKeyBytes(k)), fVal) - - strs, err := decodeAddressList(v) - if err != nil { - return errors.Wrapf(err, "(%T) could not decode address list", db) - } - - for j := range strs { - if num := mAddr[strs[j]]; num != fNum { - // than object does not match some previous filter - continue - } else if include { - mAddr[strs[j]] = fNum + 1 - } - } - - return nil - }); err != nil { - return errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key) - } - } - - fLen := len(fs) - - for a, ind := range mAddr { - if ind != fLen { - continue - } - - // check if object marked as deleted - if objectRemoved(tx, []byte(a)) { - continue - } - - addr := object.NewAddress() - if err := addr.Parse(a); err != nil { - // TODO: storage was broken, so we need to handle it - return err - } - - res = append(res, addr) - } - - return nil +func (db *DB) Select(fs object.SearchFilters) (res []*object.Address, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + res, err = db.selectObjects(tx, fs) + return err }) - return res, err + return +} + +func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) { + if len(fs) == 0 { + return db.selectAll(tx) + } + + // get indexed bucket + indexBucket := tx.Bucket(indexBucket) + if indexBucket == nil { + // empty storage + return nil, nil + } + + // keep processed addresses + // value equal to number (index+1) of latest matched filter + mAddr := make(map[string]int) + + for fNum := range fs { + matchFunc, ok := db.matchers[fs[fNum].Operation()] + if !ok { + return nil, errors.Errorf("no function for matcher %v", fs[fNum].Operation()) + } + + key := fs[fNum].Header() + + // get bucket with values + keyBucket := indexBucket.Bucket([]byte(key)) + if keyBucket == nil { + // no object has this attribute => empty result + return nil, nil + } + + fVal := fs[fNum].Value() + + // iterate over all existing values for the key + if err := keyBucket.ForEach(func(k, v []byte) error { + include := matchFunc(string(key), string(cutKeyBytes(k)), fVal) + + strs, err := decodeAddressList(v) + if err != nil { + return errors.Wrapf(err, "(%T) could not decode address list", db) + } + + for j := range strs { + if num := mAddr[strs[j]]; num != fNum { + // than object does not match some previous filter + continue + } else if include { + mAddr[strs[j]] = fNum + 1 + } + } + + return nil + }); err != nil { + return nil, errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key) + } + } + + fLen := len(fs) + res := make([]*object.Address, 0, len(mAddr)) + + for a, ind := range mAddr { + if ind != fLen { + continue + } + + // check if object marked as deleted + if objectRemoved(tx, []byte(a)) { + continue + } + + addr := object.NewAddress() + if err := addr.Parse(a); err != nil { + // TODO: storage was broken, so we need to handle it + return nil, err + } + + res = append(res, addr) + } + + return res, nil +} + +func (db *DB) selectAll(tx *bbolt.Tx) ([]*object.Address, error) { + fs := object.SearchFilters{} + + // to select all objects we can select by any property + // that splits objects into disjoint subsets + fs.AddRootFilter() + + list1, err := db.selectObjects(tx, fs) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not select root objects", db) + } + + fs = fs[:0] + + fs.AddNonRootFilter() + + list2, err := db.selectObjects(tx, fs) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not select non-root objects", db) + } + + return append(list1, list2...), nil }