forked from TrueCloudLab/frostfs-node
200fdbd361
In the previous implementation of the metabase, it was necessary to write virtual objects to the primary index to be able to select them. In this approach, virtual objects can be obtained directly using Head operation. This has a side effect in handling object operations that do not expect to receive a virtual object header in a single operation. With recent changes, it is no longer necessary to have records of virtual objects in the primary index, so this no longer happens for system integrity. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
189 lines
4.1 KiB
Go
189 lines
4.1 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/gob"
|
|
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/pkg/errors"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type bucketItem struct {
|
|
key, val string
|
|
}
|
|
|
|
var (
|
|
primaryBucket = []byte("objects")
|
|
indexBucket = []byte("index")
|
|
)
|
|
|
|
// Put saves object in DB.
|
|
//
|
|
// Object payload expected to be cut.
|
|
func (db *DB) Put(obj *object.Object) error {
|
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
par := false
|
|
|
|
for ; obj != nil; obj, par = obj.GetParent(), true {
|
|
// create primary bucket (addr: header)
|
|
primaryBucket, err := tx.CreateBucketIfNotExists(primaryBucket)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create primary bucket", db)
|
|
}
|
|
|
|
data, err := obj.ToV2().StableMarshal(nil)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not marshal the object", db)
|
|
}
|
|
|
|
addrKey := addressKey(obj.Address())
|
|
|
|
if !par {
|
|
// put header to primary bucket
|
|
if err := primaryBucket.Put(addrKey, data); err != nil {
|
|
return errors.Wrapf(err, "(%T) could not put item to primary bucket", db)
|
|
}
|
|
}
|
|
|
|
// create bucket for indices
|
|
indexBucket, err := tx.CreateBucketIfNotExists(indexBucket)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create index bucket", db)
|
|
}
|
|
|
|
// calculate indexed values for object
|
|
indices := objectIndices(obj, par)
|
|
|
|
for i := range indices {
|
|
// create index bucket
|
|
keyBucket, err := indexBucket.CreateBucketIfNotExists([]byte(indices[i].key))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not create bucket for header key", db)
|
|
}
|
|
|
|
v := nonEmptyKeyBytes([]byte(indices[i].val))
|
|
|
|
strs, err := decodeAddressList(keyBucket.Get(v))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not decode address list", db)
|
|
}
|
|
|
|
data, err := encodeAddressList(append(strs, string(addrKey)))
|
|
if err != nil {
|
|
return errors.Wrapf(err, "(%T) could not encode address list", db)
|
|
}
|
|
|
|
if err := keyBucket.Put(v, data); err != nil {
|
|
return errors.Wrapf(err, "(%T) could not put item to header bucket", db)
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func nonEmptyKeyBytes(key []byte) []byte {
|
|
return append([]byte{0}, key...)
|
|
}
|
|
|
|
func cutKeyBytes(key []byte) []byte {
|
|
return key[1:]
|
|
}
|
|
|
|
func addressKey(addr *objectSDK.Address) []byte {
|
|
return []byte(addr.String())
|
|
}
|
|
|
|
func objectIndices(obj *object.Object, parent bool) []bucketItem {
|
|
as := obj.GetAttributes()
|
|
|
|
res := make([]bucketItem, 0, 5+len(as))
|
|
|
|
rootVal := v2object.BooleanPropertyValueTrue
|
|
if obj.HasParent() {
|
|
rootVal = ""
|
|
}
|
|
|
|
leafVal := v2object.BooleanPropertyValueTrue
|
|
if parent {
|
|
leafVal = ""
|
|
}
|
|
|
|
childfreeVal := v2object.BooleanPropertyValueTrue
|
|
if len(obj.GetChildren()) > 0 {
|
|
childfreeVal = ""
|
|
}
|
|
|
|
res = append(res,
|
|
bucketItem{
|
|
key: v2object.FilterHeaderVersion,
|
|
val: obj.GetVersion().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterHeaderContainerID,
|
|
val: obj.GetContainerID().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterHeaderOwnerID,
|
|
val: obj.GetOwnerID().String(),
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyRoot,
|
|
val: rootVal,
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyLeaf,
|
|
val: leafVal,
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterPropertyChildfree,
|
|
val: childfreeVal,
|
|
},
|
|
bucketItem{
|
|
key: v2object.FilterHeaderParent,
|
|
val: obj.GetParentID().String(),
|
|
},
|
|
// TODO: add remaining fields after neofs-api#72
|
|
)
|
|
|
|
for _, a := range as {
|
|
res = append(res, bucketItem{
|
|
key: a.GetKey(),
|
|
val: a.GetValue(),
|
|
})
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
// FIXME: gob is a temporary solution, use protobuf.
|
|
func decodeAddressList(data []byte) ([]string, error) {
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var strs []string
|
|
|
|
decoder := gob.NewDecoder(bytes.NewReader(data))
|
|
if err := decoder.Decode(&strs); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return strs, nil
|
|
}
|
|
|
|
func encodeAddressList(l []string) ([]byte, error) {
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := gob.NewEncoder(buf)
|
|
|
|
if err := encoder.Encode(l); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
}
|