frostfs-node/pkg/local_object_storage/metabase/select.go
Leonard Lyubich 200fdbd361 [#149] metabase: Do not write virtual objects to the primary index
In the previous implementation of the metabase, it was necessary to write
virtual objects to the primary index to be able to select them. In this
approach, virtual objects can be obtained directly using Head operation.
This has a side effect in handling object operations that do not expect to
receive a virtual object header in a single operation. With recent changes,
it is no longer necessary to have records of virtual objects in the primary
index, so this no longer happens for system integrity.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-11-06 14:01:01 +03:00

123 lines
2.8 KiB
Go

package meta
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(fs object.SearchFilters) (res []*object.Address, err error) {
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res, err = db.selectObjects(tx, fs)
return err
})
return
}
func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) {
if len(fs) == 0 {
return db.selectAll(tx)
}
// get indexed bucket
indexBucket := tx.Bucket(indexBucket)
if indexBucket == nil {
// empty storage
return nil, nil
}
// keep processed addresses
// value equal to number (index+1) of latest matched filter
mAddr := make(map[string]int)
for fNum := range fs {
matchFunc, ok := db.matchers[fs[fNum].Operation()]
if !ok {
return nil, errors.Errorf("no function for matcher %v", fs[fNum].Operation())
}
key := fs[fNum].Header()
// get bucket with values
keyBucket := indexBucket.Bucket([]byte(key))
if keyBucket == nil {
// no object has this attribute => empty result
return nil, nil
}
fVal := fs[fNum].Value()
// iterate over all existing values for the key
if err := keyBucket.ForEach(func(k, v []byte) error {
include := matchFunc(string(key), string(cutKeyBytes(k)), fVal)
strs, err := decodeAddressList(v)
if err != nil {
return errors.Wrapf(err, "(%T) could not decode address list", db)
}
for j := range strs {
if num := mAddr[strs[j]]; num != fNum {
// than object does not match some previous filter
continue
} else if include {
mAddr[strs[j]] = fNum + 1
}
}
return nil
}); err != nil {
return nil, errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key)
}
}
fLen := len(fs)
res := make([]*object.Address, 0, len(mAddr))
for a, ind := range mAddr {
if ind != fLen {
continue
}
// check if object marked as deleted
if objectRemoved(tx, []byte(a)) {
continue
}
addr := object.NewAddress()
if err := addr.Parse(a); err != nil {
// TODO: storage was broken, so we need to handle it
return nil, err
}
res = append(res, addr)
}
return res, nil
}
func (db *DB) selectAll(tx *bbolt.Tx) ([]*object.Address, error) {
fs := object.SearchFilters{}
// to select all objects we can select by any property
// that splits objects into disjoint subsets
fs.AddRootFilter()
list1, err := db.selectObjects(tx, fs)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not select root objects", db)
}
fs = fs[:0]
fs.AddNonRootFilter()
list2, err := db.selectObjects(tx, fs)
if err != nil {
return nil, errors.Wrapf(err, "(%T) could not select non-root objects", db)
}
return append(list1, list2...), nil
}