diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index 808d0f58..a4206263 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -13,6 +13,7 @@ import ( v2object "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/util/test" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" ) @@ -260,3 +261,45 @@ func TestSelectNonExistentAttributes(t *testing.T) { require.NoError(t, err) require.Empty(t, res) } + +func TestVirtualObject(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + // create object with parent + obj := generateObject(t, testPrm{ + withParent: true, + }) + + require.NoError(t, db.Put(obj)) + + childAddr := obj.Address() + parAddr := obj.GetParent().Address() + + // child object must be readable + _, err := db.Get(childAddr) + require.NoError(t, err) + + // parent object must not be readable + _, err = db.Get(parAddr) + require.True(t, errors.Is(err, errNotFound)) + + fs := objectSDK.SearchFilters{} + + // both objects should appear in selection + testSelect(t, db, fs, childAddr, parAddr) + + // filter leaves + fs.AddLeafFilter() + + // only child object should appear + testSelect(t, db, fs, childAddr) + + fs = fs[:0] + + // filter non-leaf objects + fs.AddNonLeafFilter() + + // only parent object should appear + testSelect(t, db, fs, parAddr) +} diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index a08d32b4..7aaa0892 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -41,9 +41,11 @@ func (db *DB) Put(obj *object.Object) error { addrKey := addressKey(obj.Address()) - // put header to primary bucket - if err := primaryBucket.Put(addrKey, data); err != nil { - return errors.Wrapf(err, "(%T) could not put item to primary bucket", db) + if !par { + // put header to primary bucket + if err := primaryBucket.Put(addrKey, data); err != nil { + return errors.Wrapf(err, "(%T) could not put item to primary bucket", db) + } } // create bucket for indices diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 741cba8a..a285ed53 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -7,112 +7,117 @@ import ( ) // Select returns list of addresses of objects that match search filters. -func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) { - res := make([]*object.Address, 0) - - err := db.boltDB.View(func(tx *bbolt.Tx) error { - // get indexed bucket - indexBucket := tx.Bucket(indexBucket) - if indexBucket == nil { - // empty storage - return nil - } - - if len(fs) == 0 { - // get primary bucket - primaryBucket := tx.Bucket(primaryBucket) - if primaryBucket == nil { - // empty storage - return nil - } - - // iterate over all stored addresses - return primaryBucket.ForEach(func(k, v []byte) error { - // check if object marked as deleted - if objectRemoved(tx, k) { - return nil - } - - addr := object.NewAddress() - if err := addr.Parse(string(k)); err != nil { - // TODO: storage was broken, so we need to handle it - return err - } - - res = append(res, addr) - - return nil - }) - } - - // keep processed addresses - // value equal to number (index+1) of latest matched filter - mAddr := make(map[string]int) - - for fNum := range fs { - matchFunc, ok := db.matchers[fs[fNum].Operation()] - if !ok { - return errors.Errorf("no function for matcher %v", fs[fNum].Operation()) - } - - key := fs[fNum].Header() - - // get bucket with values - keyBucket := indexBucket.Bucket([]byte(key)) - if keyBucket == nil { - // no object has this attribute => empty result - return nil - } - - fVal := fs[fNum].Value() - - // iterate over all existing values for the key - if err := keyBucket.ForEach(func(k, v []byte) error { - include := matchFunc(string(key), string(cutKeyBytes(k)), fVal) - - strs, err := decodeAddressList(v) - if err != nil { - return errors.Wrapf(err, "(%T) could not decode address list", db) - } - - for j := range strs { - if num := mAddr[strs[j]]; num != fNum { - // than object does not match some previous filter - continue - } else if include { - mAddr[strs[j]] = fNum + 1 - } - } - - return nil - }); err != nil { - return errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key) - } - } - - fLen := len(fs) - - for a, ind := range mAddr { - if ind != fLen { - continue - } - - // check if object marked as deleted - if objectRemoved(tx, []byte(a)) { - continue - } - - addr := object.NewAddress() - if err := addr.Parse(a); err != nil { - // TODO: storage was broken, so we need to handle it - return err - } - - res = append(res, addr) - } - - return nil +func (db *DB) Select(fs object.SearchFilters) (res []*object.Address, err error) { + err = db.boltDB.View(func(tx *bbolt.Tx) error { + res, err = db.selectObjects(tx, fs) + return err }) - return res, err + return +} + +func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) { + if len(fs) == 0 { + return db.selectAll(tx) + } + + // get indexed bucket + indexBucket := tx.Bucket(indexBucket) + if indexBucket == nil { + // empty storage + return nil, nil + } + + // keep processed addresses + // value equal to number (index+1) of latest matched filter + mAddr := make(map[string]int) + + for fNum := range fs { + matchFunc, ok := db.matchers[fs[fNum].Operation()] + if !ok { + return nil, errors.Errorf("no function for matcher %v", fs[fNum].Operation()) + } + + key := fs[fNum].Header() + + // get bucket with values + keyBucket := indexBucket.Bucket([]byte(key)) + if keyBucket == nil { + // no object has this attribute => empty result + return nil, nil + } + + fVal := fs[fNum].Value() + + // iterate over all existing values for the key + if err := keyBucket.ForEach(func(k, v []byte) error { + include := matchFunc(string(key), string(cutKeyBytes(k)), fVal) + + strs, err := decodeAddressList(v) + if err != nil { + return errors.Wrapf(err, "(%T) could not decode address list", db) + } + + for j := range strs { + if num := mAddr[strs[j]]; num != fNum { + // than object does not match some previous filter + continue + } else if include { + mAddr[strs[j]] = fNum + 1 + } + } + + return nil + }); err != nil { + return nil, errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key) + } + } + + fLen := len(fs) + res := make([]*object.Address, 0, len(mAddr)) + + for a, ind := range mAddr { + if ind != fLen { + continue + } + + // check if object marked as deleted + if objectRemoved(tx, []byte(a)) { + continue + } + + addr := object.NewAddress() + if err := addr.Parse(a); err != nil { + // TODO: storage was broken, so we need to handle it + return nil, err + } + + res = append(res, addr) + } + + return res, nil +} + +func (db *DB) selectAll(tx *bbolt.Tx) ([]*object.Address, error) { + fs := object.SearchFilters{} + + // to select all objects we can select by any property + // that splits objects into disjoint subsets + fs.AddRootFilter() + + list1, err := db.selectObjects(tx, fs) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not select root objects", db) + } + + fs = fs[:0] + + fs.AddNonRootFilter() + + list2, err := db.selectObjects(tx, fs) + if err != nil { + return nil, errors.Wrapf(err, "(%T) could not select non-root objects", db) + } + + return append(list1, list2...), nil }