forked from TrueCloudLab/frostfs-node
4e7d49791b
With exist check we should index parent first, because as soon as child will be added to metabase, exist on parent will return true even if it was not indexed yet. Also this commit makes one db.Update instead of two for parent and child. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
270 lines
6.4 KiB
Go
270 lines
6.4 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/gob"
|
|
"errors"
|
|
"fmt"
|
|
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type (
|
|
namedBucketItem struct {
|
|
name, key, val []byte
|
|
}
|
|
)
|
|
|
|
var ErrUnknownObjectType = errors.New("unknown object type")
|
|
|
|
// Put saves object header in metabase. Object payload expected to be cut.
|
|
// Big objects have nil blobovniczaID.
|
|
func (db *DB) Put(obj *object.Object, id *blobovnicza.ID) error {
|
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
return db.put(tx, obj, id, false)
|
|
})
|
|
}
|
|
|
|
func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent bool) error {
|
|
exists, err := db.exists(tx, obj.Address())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// most right child and split header overlap parent so we have to
|
|
// check if object exists to not overwrite it twice
|
|
if exists {
|
|
return nil
|
|
}
|
|
|
|
if obj.GetParent() != nil && !isParent { // limit depth by two
|
|
err = db.put(tx, obj.GetParent(), id, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
uniqueIndexes, err := uniqueIndexes(obj, isParent, id)
|
|
if err != nil {
|
|
return fmt.Errorf("can' build unique indexes: %w", err)
|
|
}
|
|
|
|
// put unique indexes
|
|
for i := range uniqueIndexes {
|
|
err := putUniqueIndexItem(tx, uniqueIndexes[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// build list indexes
|
|
listIndexes, err := listIndexes(obj)
|
|
if err != nil {
|
|
return fmt.Errorf("can' build list indexes: %w", err)
|
|
}
|
|
|
|
// put list indexes
|
|
for i := range listIndexes {
|
|
err := putListIndexItem(tx, listIndexes[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// build fake bucket tree indexes
|
|
fkbtIndexes, err := fkbtIndexes(obj)
|
|
if err != nil {
|
|
return fmt.Errorf("can' build fake bucket tree indexes: %w", err)
|
|
}
|
|
|
|
// put fake bucket tree indexes
|
|
for i := range fkbtIndexes {
|
|
err := putFKBTIndexItem(tx, fkbtIndexes[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// builds list of <unique> indexes from the object.
|
|
func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) {
|
|
addr := obj.Address()
|
|
objKey := objectKey(addr.ObjectID())
|
|
result := make([]namedBucketItem, 0, 2)
|
|
|
|
// add value to primary unique bucket
|
|
if !isParent {
|
|
var bucketName []byte
|
|
|
|
switch obj.Type() {
|
|
case objectSDK.TypeRegular:
|
|
bucketName = primaryBucketName(addr.ContainerID())
|
|
case objectSDK.TypeTombstone:
|
|
bucketName = tombstoneBucketName(addr.ContainerID())
|
|
case objectSDK.TypeStorageGroup:
|
|
bucketName = storageGroupBucketName(addr.ContainerID())
|
|
default:
|
|
return nil, ErrUnknownObjectType
|
|
}
|
|
|
|
rawObject, err := obj.Marshal()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("can't marshal object header: %w", err)
|
|
}
|
|
|
|
result = append(result, namedBucketItem{
|
|
name: bucketName,
|
|
key: objKey,
|
|
val: rawObject,
|
|
})
|
|
|
|
// index blobovniczaID if it is present
|
|
if id != nil {
|
|
result = append(result, namedBucketItem{
|
|
name: smallBucketName(addr.ContainerID()),
|
|
key: objKey,
|
|
val: *id,
|
|
})
|
|
}
|
|
}
|
|
|
|
// index root object
|
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
|
result = append(result, namedBucketItem{
|
|
name: rootBucketName(addr.ContainerID()),
|
|
key: objKey,
|
|
val: zeroValue, // todo: store split.Info when it will be ready
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// builds list of <list> indexes from the object.
|
|
func listIndexes(obj *object.Object) ([]namedBucketItem, error) {
|
|
result := make([]namedBucketItem, 0, 1)
|
|
addr := obj.Address()
|
|
objKey := objectKey(addr.ObjectID())
|
|
|
|
// index payload hashes
|
|
result = append(result, namedBucketItem{
|
|
name: payloadHashBucketName(addr.ContainerID()),
|
|
key: obj.PayloadChecksum().Sum(),
|
|
val: objKey,
|
|
})
|
|
|
|
if obj.ParentID() != nil {
|
|
result = append(result, namedBucketItem{
|
|
name: parentBucketName(addr.ContainerID()),
|
|
key: objectKey(obj.ParentID()),
|
|
val: objKey,
|
|
})
|
|
}
|
|
|
|
// todo: index splitID
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// builds list of <fake bucket tree> indexes from the object.
|
|
func fkbtIndexes(obj *object.Object) ([]namedBucketItem, error) {
|
|
addr := obj.Address()
|
|
objKey := []byte(addr.ObjectID().String())
|
|
|
|
attrs := obj.Attributes()
|
|
result := make([]namedBucketItem, 0, 1+len(attrs))
|
|
|
|
// owner
|
|
result = append(result, namedBucketItem{
|
|
name: ownerBucketName(addr.ContainerID()),
|
|
key: []byte(obj.OwnerID().String()),
|
|
val: objKey,
|
|
})
|
|
|
|
// user specified attributes
|
|
for i := range attrs {
|
|
result = append(result, namedBucketItem{
|
|
name: attributeBucketName(addr.ContainerID(), attrs[i].Key()),
|
|
key: []byte(attrs[i].Value()),
|
|
val: objKey,
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
return bkt.Put(item.key, item.val)
|
|
}
|
|
|
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
fkbtRoot, err := bkt.CreateBucketIfNotExists(item.key)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
|
}
|
|
|
|
return fkbtRoot.Put(item.val, zeroValue)
|
|
}
|
|
|
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
bkt, err := tx.CreateBucketIfNotExists(item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
lst, err := decodeList(bkt.Get(item.key))
|
|
if err != nil {
|
|
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
|
}
|
|
|
|
lst = append(lst, item.val)
|
|
|
|
encodedLst, err := encodeList(lst)
|
|
if err != nil {
|
|
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
|
}
|
|
|
|
return bkt.Put(item.key, encodedLst)
|
|
}
|
|
|
|
// encodeList decodes list of bytes into a single blog for list bucket indexes.
|
|
func encodeList(lst [][]byte) ([]byte, error) {
|
|
buf := bytes.NewBuffer(nil)
|
|
encoder := gob.NewEncoder(buf)
|
|
|
|
// consider using protobuf encoding instead of glob
|
|
if err := encoder.Encode(lst); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
// decodeList decodes blob into the list of bytes from list bucket index.
|
|
func decodeList(data []byte) (lst [][]byte, err error) {
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
decoder := gob.NewDecoder(bytes.NewReader(data))
|
|
if err := decoder.Decode(&lst); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return lst, nil
|
|
}
|