diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 46f73587..a7a6ac9a 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -44,6 +44,7 @@ func NewDB(opts ...Option) *DB { path: c.boltDB.Path(), cfg: c, matchers: map[object.SearchMatchType]func(string, string, string) bool{ + object.MatchUnknown: unknownMatcher, object.MatchStringEqual: stringEqualMatcher, }, } @@ -62,11 +63,19 @@ func stringEqualMatcher(key, objVal, filterVal string) bool { switch key { default: return objVal == filterVal - case - v2object.FilterPropertyRoot, - v2object.FilterPropertyChildfree, - v2object.FilterPropertyLeaf: + case v2object.FilterPropertyChildfree: return (filterVal == v2object.BooleanPropertyValueTrue) == (objVal == v2object.BooleanPropertyValueTrue) + case v2object.FilterPropertyPhy, v2object.FilterPropertyRoot: + return true + } +} + +func unknownMatcher(key, _, _ string) bool { + switch key { + default: + return false + case v2object.FilterPropertyPhy, v2object.FilterPropertyRoot: + return true } } diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index f34a8c20..586b6b20 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -164,31 +164,11 @@ func TestDB_SelectProperties(t *testing.T) { fs.AddRootFilter() testSelect(t, db, fs, parAddr) - // non-root filter + // phy filter fs = fs[:0] - fs.AddNonRootFilter() + fs.AddPhyFilter() testSelect(t, db, fs, childAddr) - // root filter (with random false value) - fs = fs[:0] - fs.AddFilter(v2object.FilterPropertyRoot, "some false value", objectSDK.MatchStringEqual) - testSelect(t, db, fs, childAddr) - - // leaf filter - fs = fs[:0] - fs.AddLeafFilter() - testSelect(t, db, fs, childAddr) - - // non-leaf filter - fs = fs[:0] - fs.AddNonLeafFilter() - testSelect(t, db, fs, parAddr) - - // leaf filter (with random false value) - fs = fs[:0] - fs.AddFilter(v2object.FilterPropertyLeaf, "some false value", objectSDK.MatchStringEqual) - testSelect(t, db, fs, parAddr) - lnk := object.NewRaw() lnk.SetContainerID(testCID()) lnk.SetID(testOID()) @@ -290,7 +270,7 @@ func TestVirtualObject(t *testing.T) { testSelect(t, db, fs, childAddr, parAddr) // filter leaves - fs.AddLeafFilter() + fs.AddPhyFilter() // only child object should appear testSelect(t, db, fs, childAddr) @@ -298,7 +278,7 @@ func TestVirtualObject(t *testing.T) { fs = fs[:0] // filter non-leaf objects - fs.AddNonLeafFilter() + fs.AddRootFilter() // only parent object should appear testSelect(t, db, fs, parAddr) diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index f958f178..49ff7065 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -96,17 +96,7 @@ func addressKey(addr *objectSDK.Address) []byte { func objectIndices(obj *object.Object, parent bool) []bucketItem { as := obj.GetAttributes() - res := make([]bucketItem, 0, 5+len(as)) - - rootVal := v2object.BooleanPropertyValueTrue - if obj.GetType() != objectSDK.TypeRegular || obj.HasParent() { - rootVal = "" - } - - leafVal := v2object.BooleanPropertyValueTrue - if parent { - leafVal = "" - } + res := make([]bucketItem, 0, 7+len(as)) // 7 predefined buckets and object attributes childfreeVal := v2object.BooleanPropertyValueTrue if len(obj.GetChildren()) > 0 { @@ -126,14 +116,6 @@ func objectIndices(obj *object.Object, parent bool) []bucketItem { key: v2object.FilterHeaderOwnerID, val: obj.GetOwnerID().String(), }, - bucketItem{ - key: v2object.FilterPropertyRoot, - val: rootVal, - }, - bucketItem{ - key: v2object.FilterPropertyLeaf, - val: leafVal, - }, bucketItem{ key: v2object.FilterPropertyChildfree, val: childfreeVal, @@ -145,6 +127,20 @@ func objectIndices(obj *object.Object, parent bool) []bucketItem { // TODO: add remaining fields after neofs-api#72 ) + if obj.GetType() == objectSDK.TypeRegular && !obj.HasParent() { + res = append(res, bucketItem{ + key: v2object.FilterPropertyRoot, + val: v2object.BooleanPropertyValueTrue, + }) + } + + if !parent { + res = append(res, bucketItem{ + key: v2object.FilterPropertyPhy, + val: v2object.BooleanPropertyValueTrue, + }) + } + for _, a := range as { res = append(res, bucketItem{ key: a.GetKey(), diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 3f18604e..6f20cbaf 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -2,6 +2,7 @@ package meta import ( "github.com/nspcc-dev/neofs-api-go/pkg/object" + v2object "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/pkg/errors" "go.etcd.io/bbolt" ) @@ -51,7 +52,7 @@ func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Ad // iterate over all existing values for the key if err := keyBucket.ForEach(func(k, v []byte) error { - include := matchFunc(string(key), string(cutKeyBytes(k)), fVal) + include := matchFunc(key, string(cutKeyBytes(k)), fVal) if include { return keyBucket.Bucket(k).ForEach(func(k, _ []byte) error { @@ -96,25 +97,53 @@ func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Ad } func (db *DB) selectAll(tx *bbolt.Tx) ([]*object.Address, error) { - fs := object.SearchFilters{} + result := map[string]struct{}{} - // to select all objects we can select by any property - // that splits objects into disjoint subsets - fs.AddRootFilter() + primaryBucket := tx.Bucket(primaryBucket) + indexBucket := tx.Bucket(indexBucket) - list1, err := db.selectObjects(tx, fs) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not select root objects", db) + if primaryBucket == nil || indexBucket == nil { + return nil, nil } - fs = fs[:0] + if err := primaryBucket.ForEach(func(k, _ []byte) error { + result[string(k)] = struct{}{} - fs.AddNonRootFilter() - - list2, err := db.selectObjects(tx, fs) - if err != nil { - return nil, errors.Wrapf(err, "(%T) could not select non-root objects", db) + return nil + }); err != nil { + return nil, errors.Wrapf(err, "(%T) could not iterate primary bucket", db) } - return append(list1, list2...), nil + rootBucket := indexBucket.Bucket([]byte(v2object.FilterPropertyRoot)) + if rootBucket != nil { + rootBucket = rootBucket.Bucket(nonEmptyKeyBytes([]byte(v2object.BooleanPropertyValueTrue))) + } + + if rootBucket != nil { + if err := rootBucket.ForEach(func(k, v []byte) error { + result[string(k)] = struct{}{} + + return nil + }); err != nil { + return nil, errors.Wrapf(err, "(%T) could not iterate root bucket", db) + } + } + + list := make([]*object.Address, 0, len(result)) + + for k := range result { + // check if object marked as deleted + if objectRemoved(tx, []byte(k)) { + continue + } + + addr := object.NewAddress() + if err := addr.Parse(k); err != nil { + return nil, err // TODO: storage was broken, so we need to handle it + } + + list = append(list, addr) + } + + return list, nil }