forked from TrueCloudLab/frostfs-node
77e80f517f
In the previous implementation of the metabase, the unique value of the header was assigned a bucket, the elements of which were leaves with a key-address and an empty value. This approach was relatively efficient in terms of write speed. However, a large number of buckets led to a rapid increase in the database volume (~4GB for 100K objects with unique attributes). An approach is presented with storing indexes on the value of headers in the leaves of the tree, where the keys are the unique values of the header, and the values are a serialized list of addresses (gob encoding is temporarily used for serialization). The new approach gave a good result in saving space (~350MB), however, it significantly reduced the write speed with an increase in the number of objects (~ 80x after 100K objects). Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
183 lines
4 KiB
Go
183 lines
4 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/gob"
|
|
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/pkg/errors"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type bucketItem struct {
|
|
key, val string
|
|
}
|
|
|
|
var (
|
|
primaryBucket = []byte("objects")
|
|
indexBucket = []byte("index")
|
|
)
|
|
|
|
// Put saves object in DB.
|
|
//
|
|
// Object payload expected to be cut.
|
|
func (db *DB) Put(obj *object.Object) error {
|
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
par := false
|
|
|
|
for ; obj != nil; obj, par = obj.GetParent(), true {
|
|
// create primary bucket (addr: header)
|
|
primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create primary bucket", db)
|
|
}
|
|
|
|
data, err := obj.ToV2().StableMarshal(nil)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not marshal the object", db)
|
|
}
|
|
|
|
addrKey := addressKey(obj.Address())
|
|
|
|
// put header to primary bucket
|
|
if err := primaryBucket.Put(addrKey, data); err != nil {
|
|
return errors.Wrapf(err, "(%T) could not put item to primary bucket", db)
|
|
}
|
|
|
|
// create bucket for indices
|
|
indexBucket, err := tx.CreateBucketIfNotExists(indexBucket)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create index bucket", db)
|
|
}
|
|
|
|
// calculate indexed values for object
|
|
indices := objectIndices(obj, par)
|
|
|
|
for i := range indices {
|
|
// create index bucket
|
|
keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create bucket for header key", db)
|
|
}
|
|
|
|
v := nonEmptyKeyBytes([]byte(indices[i].val))
|
|
|
|
strs, err := decodeAddressList(keyBucket.Get(v))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not decode address list", db)
|
|
}
|
|
|
|
data, err := encodeAddressList(append(strs, string(addrKey)))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not encode address list", db)
|
|
}
|
|
|
|
if err := keyBucket.Put(v, data); err != nil {
|
|
return errors.Wrapf(err, "(%T) could not put item to header bucket", db)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func nonEmptyKeyBytes(key []byte) []byte {
|
|
return append([]byte{0}, key...)
|
|
}
|
|
|
|
func cutKeyBytes(key []byte) []byte {
|
|
return key[1:]
|
|
}
|
|
|
|
func addressKey(addr *objectSDK.Address) []byte {
|
|
return []byte(addr.String())
|
|
}
|
|
|
|
func objectIndices(obj *object.Object, parent bool) []bucketItem {
|
|
as := obj.GetAttributes()
|
|
|
|
res := make([]bucketItem, 0, 5+len(as))
|
|
|
|
rootVal := v2object.BooleanPropertyValueTrue
|
|
if obj.HasParent() {
|
|
rootVal = ""
|
|
}
|
|
|
|
leafVal := v2object.BooleanPropertyValueTrue
|
|
if parent {
|
|
leafVal = ""
|
|
}
|
|
|
|
childfreeVal := v2object.BooleanPropertyValueTrue
|
|
if len(obj.GetChildren()) > 0 {
|
|
childfreeVal = ""
|
|
}
|
|
|
|
res = append(res,
|
|
bucketItem{
|
|
key: v2object.FilterHeaderVersion,
|
|
val: obj.GetVersion().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterHeaderContainerID,
|
|
val: obj.GetContainerID().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterHeaderOwnerID,
|
|
val: obj.GetOwnerID().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyRoot,
|
|
val: rootVal,
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyLeaf,
|
|
val: leafVal,
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyChildfree,
|
|
val: childfreeVal,
|
|
},
|
|
// TODO: add remaining fields after neofs-api#72
|
|
)
|
|
|
|
for _, a := range as {
|
|
res = append(res, bucketItem{
|
|
key: a.GetKey(),
|
|
val: a.GetValue(),
|
|
})
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// FIXME: gob is a temporary solution, use protobuf.
|
|
func decodeAddressList(data []byte) ([]string, error) {
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var strs []string
|
|
|
|
decoder := gob.NewDecoder(bytes.NewReader(data))
|
|
if err := decoder.Decode(&strs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return strs, nil
|
|
}
|
|
|
|
func encodeAddressList(l []string) ([]byte, error) {
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := gob.NewEncoder(buf)
|
|
|
|
if err := encoder.Encode(l); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
}
|