frostfs-node/pkg/local_object_storage/metabase/select.go
Leonard Lyubich 77e80f517f [#142] metabase: Store header value index in a tree leaf
In the previous implementation of the metabase, the unique value of the
header was assigned a bucket, the elements of which were leaves with a
key-address and an empty value. This approach was relatively efficient in
terms of write speed. However, a large number of buckets led to a rapid
increase in the database volume (~4GB for 100K objects with unique
attributes). An approach is presented with storing indexes on the value of
headers in the leaves of the tree, where the keys are the unique values ​​of
the header, and the values ​​are a serialized list of addresses (gob
encoding is temporarily used for serialization).

The new approach gave a good result in saving space (~350MB), however, it
significantly reduced the write speed with an increase in the number of
objects (~ 80x after 100K objects).

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2020-11-03 18:42:32 +03:00

110 lines
2.4 KiB
Go

package meta
import (
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(fs object.SearchFilters) ([]*object.Address, error) {
res := make([]*object.Address, 0)
err := db.boltDB.View(func(tx *bbolt.Tx) error {
// get indexed bucket
indexBucket := tx.Bucket(indexBucket)
if indexBucket == nil {
// empty storage
return nil
}
if len(fs) == 0 {
// get primary bucket
primaryBucket := tx.Bucket(primaryBucket)
if primaryBucket == nil {
// empty storage
return nil
}
// iterate over all stored addresses
return primaryBucket.ForEach(func(k, v []byte) error {
// check if object marked as deleted
if objectRemoved(tx, k) {
return nil
}
addr := object.NewAddress()
if err := addr.Parse(string(k)); err != nil {
// TODO: storage was broken, so we need to handle it
return err
}
res = append(res, addr)
return nil
})
}
// keep processed addresses
mAddr := make(map[string]struct{})
for _, f := range fs {
matchFunc, ok := db.matchers[f.Operation()]
if !ok {
return errors.Errorf("no function for matcher %v", f.Operation())
}
key := f.Header()
// get bucket with values
keyBucket := indexBucket.Bucket([]byte(key))
if keyBucket == nil {
// no object has this attribute => empty result
return nil
}
fVal := f.Value()
// iterate over all existing values for the key
if err := keyBucket.ForEach(func(k, v []byte) error {
include := matchFunc(string(key), string(cutKeyBytes(k)), fVal)
strs, err := decodeAddressList(v)
if err != nil {
return errors.Wrapf(err, "(%T) could not decode address list", db)
}
for i := range strs {
if include {
mAddr[strs[i]] = struct{}{}
} else {
delete(mAddr, strs[i])
}
}
return nil
}); err != nil {
return errors.Wrapf(err, "(%T) could not iterate bucket %s", db, key)
}
}
for a := range mAddr {
// check if object marked as deleted
if objectRemoved(tx, []byte(a)) {
return nil
}
addr := object.NewAddress()
if err := addr.Parse(a); err != nil {
// TODO: storage was broken, so we need to handle it
return err
}
res = append(res, addr)
}
return nil
})
return res, err
}