[#131] metabase: Implement indexing by object properties

Process parent objects in Put method. Headers of parent object are stored as
regular leaf objects in metabase from now. Build indexes for ROOT, LEAF and
CHILDFREE properties.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/KirillovDenis/release/v0.21.1
Leonard Lyubich 2020-10-29 19:12:38 +03:00 committed by Alex Vanin
parent 97077294fc
commit 1db01725c9
4 changed files with 175 additions and 49 deletions

View File

@ -2,6 +2,7 @@ package meta
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
"go.etcd.io/bbolt"
)
@ -9,17 +10,27 @@ import (
type DB struct {
boltDB *bbolt.DB
matchers map[object.SearchMatchType]func(string, string) bool
matchers map[object.SearchMatchType]func(string, string, string) bool
}
// NewDB creates, initializes and returns DB instance.
func NewDB(boltDB *bbolt.DB) *DB {
return &DB{
boltDB: boltDB,
matchers: map[object.SearchMatchType]func(string, string) bool{
object.MatchStringEqual: func(s string, s2 string) bool {
return s == s2
},
matchers: map[object.SearchMatchType]func(string, string, string) bool{
object.MatchStringEqual: stringEqualMatcher,
},
}
}
func stringEqualMatcher(key, objVal, filterVal string) bool {
switch key {
default:
return objVal == filterVal
case
v2object.FilterPropertyRoot,
v2object.FilterPropertyChildfree,
v2object.FilterPropertyLeaf:
return (filterVal == v2object.BooleanPropertyValueTrue) == (objVal == v2object.BooleanPropertyValueTrue)
}
}

View File

@ -10,6 +10,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/util/test"
"github.com/stretchr/testify/require"
@ -153,3 +154,85 @@ func TestDB_Delete(t *testing.T) {
testSelect(t, db, fs)
}
func TestDB_SelectProperties(t *testing.T) {
path := "test.db"
bdb, err := bbolt.Open(path, 0600, nil)
require.NoError(t, err)
defer func() {
bdb.Close()
os.Remove(path)
}()
db := NewDB(bdb)
parent := object.NewRaw()
parent.SetContainerID(testCID())
parent.SetID(testOID())
child := object.NewRaw()
child.SetContainerID(testCID())
child.SetID(testOID())
child.SetParent(parent.Object().SDK())
parAddr := parent.Object().Address()
childAddr := child.Object().Address()
require.NoError(t, db.Put(child.Object()))
// root filter
fs := objectSDK.SearchFilters{}
fs.AddRootFilter()
testSelect(t, db, fs, parAddr)
// non-root filter
fs = fs[:0]
fs.AddNonRootFilter()
testSelect(t, db, fs, childAddr)
// root filter (with random false value)
fs = fs[:0]
fs.AddFilter(v2object.FilterPropertyRoot, "some false value", objectSDK.MatchStringEqual)
testSelect(t, db, fs, childAddr)
// leaf filter
fs = fs[:0]
fs.AddLeafFilter()
testSelect(t, db, fs, childAddr)
// non-leaf filter
fs = fs[:0]
fs.AddNonLeafFilter()
testSelect(t, db, fs, parAddr)
// leaf filter (with random false value)
fs = fs[:0]
fs.AddFilter(v2object.FilterPropertyLeaf, "some false value", objectSDK.MatchStringEqual)
testSelect(t, db, fs, parAddr)
lnk := object.NewRaw()
lnk.SetContainerID(testCID())
lnk.SetID(testOID())
lnk.SetChildren(testOID())
lnkAddr := lnk.Object().Address()
require.NoError(t, db.Put(lnk.Object()))
// childfree filter
fs = fs[:0]
fs.AddChildfreeFilter()
testSelect(t, db, fs, childAddr, parAddr)
// non-childfree filter
fs = fs[:0]
fs.AddNonChildfreeFilter()
testSelect(t, db, fs, lnkAddr)
// childfree filter (with random false value)
fs = fs[:0]
fs.AddFilter(v2object.FilterPropertyChildfree, "some false value", objectSDK.MatchStringEqual)
testSelect(t, db, fs, lnkAddr)
}

View File

@ -22,55 +22,60 @@ var (
// Object payload expected to be cut.
func (db *DB) Put(obj *object.Object) error {
return db.boltDB.Update(func(tx *bbolt.Tx) error {
// create primary bucket (addr: header)
primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket)
if err != nil {
return errors.Wrapf(err, "(%T) could not create primary bucket", db)
}
par := false
data, err := obj.ToV2().StableMarshal(nil)
if err != nil {
return errors.Wrapf(err, "(%T) could not marshal the object", db)
}
addrKey := addressKey(obj.Address())
// put header to primary bucket
if err := primaryBucket.Put(addrKey, data); err != nil {
return errors.Wrapf(err, "(%T) could not put item to primary bucket", db)
}
// create bucket for indices
indexBucket, err := tx.CreateBucketIfNotExists(indexBucket)
if err != nil {
return errors.Wrapf(err, "(%T) could not create index bucket", db)
}
// calculate indexed values for object
indices := objectIndices(obj)
for i := range indices {
// create index bucket
keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key))
for ; obj != nil; obj, par = obj.GetParent(), true {
// create primary bucket (addr: header)
primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket)
if err != nil {
return errors.Wrapf(err, "(%T) could not create bucket for header key", db)
return errors.Wrapf(err, "(%T) could not create primary bucket", db)
}
// FIXME: here we can get empty slice that could not be the key
// Possible solutions:
// 1. add prefix byte (0 if empty);
v := []byte(indices[i].val)
// create address bucket for the value
valBucket, err := keyBucket.CreateBucketIfNotExists(nonEmptyKeyBytes(v))
data, err := obj.ToV2().StableMarshal(nil)
if err != nil {
return errors.Wrapf(err, "(%T) could not create bucket for header value", db)
return errors.Wrapf(err, "(%T) could not marshal the object", db)
}
// put object address to value bucket
if err := valBucket.Put(addrKey, nil); err != nil {
return errors.Wrapf(err, "(%T) could not put item to header bucket", db)
addrKey := addressKey(obj.Address())
// put header to primary bucket
if err := primaryBucket.Put(addrKey, data); err != nil {
return errors.Wrapf(err, "(%T) could not put item to primary bucket", db)
}
// create bucket for indices
indexBucket, err := tx.CreateBucketIfNotExists(indexBucket)
if err != nil {
return errors.Wrapf(err, "(%T) could not create index bucket", db)
}
// calculate indexed values for object
indices := objectIndices(obj, par)
for i := range indices {
// create index bucket
keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key))
if err != nil {
return errors.Wrapf(err, "(%T) could not create bucket for header key", db)
}
// FIXME: here we can get empty slice that could not be the key
// Possible solutions:
// 1. add prefix byte (0 if empty);
v := []byte(indices[i].val)
// create address bucket for the value
valBucket, err := keyBucket.CreateBucketIfNotExists(nonEmptyKeyBytes(v))
if err != nil {
return errors.Wrapf(err, "(%T) could not create bucket for header value", db)
}
// put object address to value bucket
if err := valBucket.Put(addrKey, nil); err != nil {
return errors.Wrapf(err, "(%T) could not put item to header bucket", db)
}
}
}
return nil
@ -89,10 +94,25 @@ func addressKey(addr *objectSDK.Address) []byte {
return []byte(addr.String())
}
func objectIndices(obj *object.Object) []bucketItem {
func objectIndices(obj *object.Object, parent bool) []bucketItem {
as := obj.GetAttributes()
res := make([]bucketItem, 0, 3+len(as))
res := make([]bucketItem, 0, 5+len(as))
rootVal := v2object.BooleanPropertyValueTrue
if obj.HasParent() {
rootVal = ""
}
leafVal := v2object.BooleanPropertyValueTrue
if parent {
leafVal = ""
}
childfreeVal := v2object.BooleanPropertyValueTrue
if len(obj.GetChildren()) > 0 {
childfreeVal = ""
}
res = append(res,
bucketItem{
@ -107,6 +127,18 @@ func objectIndices(obj *object.Object) []bucketItem {
key: v2object.FilterHeaderOwnerID,
val: obj.GetOwnerID().String(),
},
bucketItem{
key: v2object.FilterPropertyRoot,
val: rootVal,
},
bucketItem{
key: v2object.FilterPropertyLeaf,
val: leafVal,
},
bucketItem{
key: v2object.FilterPropertyChildfree,
val: childfreeVal,
},
// TODO: add remaining fields after neofs-api#72
)

View File

@ -39,7 +39,7 @@ func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
// iterate over all existing values for the key
if err := keyBucket.ForEach(func(k, _ []byte) error {
if !matchFunc(string(cutKeyBytes(k)), fVal) {
if !matchFunc(string(key), string(cutKeyBytes(k)), fVal) {
// exclude all addresses with this value
return keyBucket.Bucket(k).ForEach(func(k, _ []byte) error {
mAddr[string(k)] = struct{}{}