forked from TrueCloudLab/frostfs-node
[#9999] metabase: Fix db engine to pebble in put.go
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
8bfa2fe106
commit
e5b9ad6465
1 changed files with 68 additions and 220 deletions
|
@ -1,11 +1,10 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
gio "io"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -19,18 +18,10 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/cockroachdb/pebble"
|
"github.com/cockroachdb/pebble"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
|
||||||
namedBucketItem struct {
|
|
||||||
name, key, val []byte
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
// PutPrm groups the parameters of Put operation.
|
// PutPrm groups the parameters of Put operation.
|
||||||
type PutPrm struct {
|
type PutPrm struct {
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
|
@ -90,10 +81,16 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
currEpoch := db.epochState.CurrentEpoch()
|
||||||
|
cnr, ok := prm.obj.ContainerID()
|
||||||
|
if !ok {
|
||||||
|
return PutRes{}, errors.New("missing container in object")
|
||||||
|
}
|
||||||
|
|
||||||
|
defer db.guard.LockContainerID(cnr)()
|
||||||
|
|
||||||
err = db.batch(func(b *pebble.Batch) error {
|
err = db.batch(func(b *pebble.Batch) error {
|
||||||
var e error
|
var e error
|
||||||
res, e = db.put(b, prm.obj, prm.id, nil, currEpoch)
|
res, e = db.put(ctx, b, prm.obj, prm.id, nil, currEpoch)
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -106,7 +103,9 @@ func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) put(batch *pebble.Batch,
|
func (db *DB) put(
|
||||||
|
ctx context.Context,
|
||||||
|
b *pebble.Batch,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
id []byte,
|
id []byte,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
|
@ -119,7 +118,7 @@ func (db *DB) put(batch *pebble.Batch,
|
||||||
|
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
|
|
||||||
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
exists, _, err := db.exists(ctx, b, objectCore.AddressOf(obj), oid.Address{}, currEpoch)
|
||||||
|
|
||||||
var splitInfoError *objectSDK.SplitInfoError
|
var splitInfoError *objectSDK.SplitInfoError
|
||||||
if errors.As(err, &splitInfoError) {
|
if errors.As(err, &splitInfoError) {
|
||||||
|
@ -129,70 +128,71 @@ func (db *DB) put(batch *pebble.Batch,
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
return PutRes{}, db.updateObj(b, obj, id, si, isParent)
|
||||||
}
|
}
|
||||||
|
|
||||||
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch)
|
return PutRes{Inserted: true}, db.insertObject(ctx, b, obj, id, si, isParent, cnr, currEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
func (db *DB) updateObj(b *pebble.Batch, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
||||||
|
addr := objectCore.AddressOf(obj)
|
||||||
// most right child and split header overlap parent so we have to
|
// most right child and split header overlap parent so we have to
|
||||||
// check if object exists to not overwrite it twice
|
// check if object exists to not overwrite it twice
|
||||||
|
|
||||||
// When storage engine moves objects between different sub-storages,
|
// When storage engine moves objects between different sub-storages,
|
||||||
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
||||||
if !isParent && id != nil {
|
if !isParent && id != nil {
|
||||||
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
return setStorageID(b, addr, id, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// when storage already has last object in split hierarchy and there is
|
// when storage already has last object in split hierarchy and there is
|
||||||
// a linking object to put (or vice versa), we should update split info
|
// a linking object to put (or vice versa), we should update split info
|
||||||
// with object ids of these objects
|
// with object ids of these objects
|
||||||
if isParent {
|
if isParent {
|
||||||
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
return updateSplitInfo(b, addr.Container(), addr.Object(), si)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error {
|
func (db *DB) insertObject(ctx context.Context, b *pebble.Batch, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64) error {
|
||||||
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
||||||
parentSI, err := splitInfoFromObject(obj)
|
parentSI, err := splitInfoFromObject(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.put(tx, par, id, parentSI, currEpoch)
|
_, err = db.put(ctx, b, par, id, parentSI, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := putUniqueIndexes(tx, obj, si, id)
|
err := putUniqueIndexes(b, obj, si, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put unique indexes: %w", err)
|
return fmt.Errorf("can't put unique indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateListIndexes(tx, obj, putListIndexItem)
|
err = updateListIndexes(b, obj, putListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put list indexes: %w", err)
|
return fmt.Errorf("can't put list indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
err = updateFKBTIndexes(b, obj, putListIndexItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update container volume size estimation
|
// update container volume size estimation
|
||||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
err = changeContainerSize(b, cnr, int64(obj.PayloadSize()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !isParent {
|
if !isParent {
|
||||||
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
|
if err = incCounters(b, cnr, IsUserObject(obj)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -201,26 +201,24 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
||||||
}
|
}
|
||||||
|
|
||||||
func putUniqueIndexes(
|
func putUniqueIndexes(
|
||||||
tx *bbolt.Tx,
|
b *pebble.Batch,
|
||||||
obj *objectSDK.Object,
|
obj *objectSDK.Object,
|
||||||
si *objectSDK.SplitInfo,
|
si *objectSDK.SplitInfo,
|
||||||
id []byte,
|
id []byte,
|
||||||
) error {
|
) error {
|
||||||
isParent := si != nil
|
isParent := si != nil
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
cnr := addr.Container()
|
|
||||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
|
||||||
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
|
||||||
// add value to primary unique bucket
|
// add value to primary unique bucket
|
||||||
if !isParent {
|
if !isParent {
|
||||||
|
var key []byte
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeRegular:
|
case objectSDK.TypeRegular:
|
||||||
bucketName = primaryBucketName(cnr, bucketName)
|
key = primaryKey(addr.Container(), addr.Object())
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
bucketName = tombstoneBucketName(cnr, bucketName)
|
key = tombstoneKey(addr.Container(), addr.Object())
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
bucketName = bucketNameLockers(cnr, bucketName)
|
key = lockersKey(addr.Container(), addr.Object())
|
||||||
default:
|
default:
|
||||||
return ErrUnknownObjectType
|
return ErrUnknownObjectType
|
||||||
}
|
}
|
||||||
|
@ -230,18 +228,14 @@ func putUniqueIndexes(
|
||||||
return fmt.Errorf("can't marshal object header: %w", err)
|
return fmt.Errorf("can't marshal object header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = putUniqueIndexItem(tx, namedBucketItem{
|
err = b.Set(key, rawObject, pebble.Sync)
|
||||||
name: bucketName,
|
|
||||||
key: objKey,
|
|
||||||
val: rawObject,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// index storageID if it is present
|
// index storageID if it is present
|
||||||
if id != nil {
|
if id != nil {
|
||||||
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
if err = setStorageID(b, objectCore.AddressOf(obj), id, false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -249,6 +243,7 @@ func putUniqueIndexes(
|
||||||
|
|
||||||
// index root object
|
// index root object
|
||||||
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
||||||
|
objID := addr.Object()
|
||||||
if ecHead := obj.ECHeader(); ecHead != nil {
|
if ecHead := obj.ECHeader(); ecHead != nil {
|
||||||
parentID := ecHead.Parent()
|
parentID := ecHead.Parent()
|
||||||
if ecHead.ParentSplitID() != nil {
|
if ecHead.ParentSplitID() != nil {
|
||||||
|
@ -263,37 +258,14 @@ func putUniqueIndexes(
|
||||||
|
|
||||||
parentID = *parentSplitParentID
|
parentID = *parentSplitParentID
|
||||||
}
|
}
|
||||||
objKey = objectKey(parentID, objKey)
|
objID = parentID
|
||||||
}
|
}
|
||||||
return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si)
|
return updateSplitInfo(b, addr.Container(), objID, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
|
||||||
return updateUniqueIndexItem(tx, namedBucketItem{
|
|
||||||
name: rootBucketName(cnr, bucketName),
|
|
||||||
key: objKey,
|
|
||||||
}, func(old, _ []byte) ([]byte, error) {
|
|
||||||
switch {
|
|
||||||
case si == nil && old == nil:
|
|
||||||
return []byte{}, nil
|
|
||||||
case si == nil:
|
|
||||||
return old, nil
|
|
||||||
case old == nil:
|
|
||||||
return si.Marshal()
|
|
||||||
default:
|
|
||||||
oldSI := objectSDK.NewSplitInfo()
|
|
||||||
if err := oldSI.Unmarshal(old); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
si = util.MergeSplitInfo(si, oldSI)
|
|
||||||
return si.Marshal()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
type updateIndexItemFunc = func(b *pebble.Batch, key []byte) error
|
type updateIndexItemFunc = func(b *pebble.Batch, key []byte) error
|
||||||
|
|
||||||
func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||||
|
@ -354,12 +326,7 @@ func updateListIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItem
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
||||||
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
err := f(b, parentKey(cnr, *parentSplitParentID, ech.Parent()))
|
||||||
err := f(tx, namedBucketItem{
|
|
||||||
name: parentBucketName(cnr, bucketName),
|
|
||||||
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
|
||||||
val: objKey,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -396,161 +363,42 @@ func updateFKBTIndexes(b *pebble.Batch, obj *objectSDK.Object, f updateIndexItem
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type bucketContainer interface {
|
func putListIndexItem(b *pebble.Batch, key []byte) error {
|
||||||
Bucket([]byte) *bbolt.Bucket
|
return b.Set(key, zeroValue, pebble.Sync)
|
||||||
CreateBucket([]byte) (*bbolt.Bucket, error)
|
|
||||||
CreateBucketIfNotExists([]byte) (*bbolt.Bucket, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Bucket, error) {
|
|
||||||
if bkt := tx.Bucket(name); bkt != nil {
|
|
||||||
return bkt, nil
|
|
||||||
}
|
|
||||||
return tx.CreateBucket(name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := update(bkt.Get(item.key), item.val)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return bkt.Put(item.key, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
|
||||||
}
|
|
||||||
|
|
||||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fkbtRoot.Put(item.val, zeroValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
||||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lst, err := decodeList(bkt.Get(item.key))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lst = append(lst, item.val)
|
|
||||||
|
|
||||||
encodedLst, err := encodeList(lst)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return bkt.Put(item.key, encodedLst)
|
|
||||||
}
|
|
||||||
|
|
||||||
// encodeList decodes list of bytes into a single blog for list bucket indexes.
|
|
||||||
func encodeList(lst [][]byte) ([]byte, error) {
|
|
||||||
w := io.NewBufBinWriter()
|
|
||||||
w.WriteVarUint(uint64(len(lst)))
|
|
||||||
for i := range lst {
|
|
||||||
w.WriteVarBytes(lst[i])
|
|
||||||
}
|
|
||||||
if w.Err != nil {
|
|
||||||
return nil, w.Err
|
|
||||||
}
|
|
||||||
return w.Bytes(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// decodeList decodes blob into the list of bytes from list bucket index.
|
|
||||||
func decodeList(data []byte) (lst [][]byte, err error) {
|
|
||||||
if len(data) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var offset uint64
|
|
||||||
size, n, err := getVarUint(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += uint64(n)
|
|
||||||
lst = make([][]byte, size, size+1)
|
|
||||||
for i := range lst {
|
|
||||||
sz, n, err := getVarUint(data[offset:])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
offset += uint64(n)
|
|
||||||
|
|
||||||
next := offset + sz
|
|
||||||
if uint64(len(data)) < next {
|
|
||||||
return nil, gio.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
lst[i] = data[offset:next]
|
|
||||||
offset = next
|
|
||||||
}
|
|
||||||
return lst, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getVarUint(data []byte) (uint64, int, error) {
|
|
||||||
if len(data) == 0 {
|
|
||||||
return 0, 0, gio.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
|
|
||||||
switch b := data[0]; b {
|
|
||||||
case 0xfd:
|
|
||||||
if len(data) < 3 {
|
|
||||||
return 0, 1, gio.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return uint64(binary.LittleEndian.Uint16(data[1:])), 3, nil
|
|
||||||
case 0xfe:
|
|
||||||
if len(data) < 5 {
|
|
||||||
return 0, 1, gio.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return uint64(binary.LittleEndian.Uint32(data[1:])), 5, nil
|
|
||||||
case 0xff:
|
|
||||||
if len(data) < 9 {
|
|
||||||
return 0, 1, gio.ErrUnexpectedEOF
|
|
||||||
}
|
|
||||||
return binary.LittleEndian.Uint64(data[1:]), 9, nil
|
|
||||||
default:
|
|
||||||
return uint64(b), 1, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// setStorageID for existing objects if they were moved from one
|
|
||||||
// storage location to another.
|
|
||||||
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
|
||||||
key := make([]byte, bucketKeySize)
|
|
||||||
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
key = objectKey(addr.Object(), key)
|
|
||||||
if override || bkt.Get(key) == nil {
|
|
||||||
return bkt.Put(key, id)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateSpliInfo for existing objects if storage filled with extra information
|
// updateSpliInfo for existing objects if storage filled with extra information
|
||||||
// about last object in split hierarchy or linking object.
|
// about last object in split hierarchy or linking object.
|
||||||
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
|
func updateSplitInfo(b *pebble.Batch, cnr cid.ID, obj oid.ID, si *objectSDK.SplitInfo) error {
|
||||||
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
key := rootKey(cnr, obj)
|
||||||
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
existed, err := valueSafe(b, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case si == nil && existed == nil:
|
||||||
|
return b.Set(key, zeroValue, pebble.Sync)
|
||||||
|
case si == nil:
|
||||||
|
return nil
|
||||||
|
case existed == nil || bytes.Equal(existed, zeroValue):
|
||||||
|
siBytes, err := si.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b.Set(key, siBytes, pebble.Sync)
|
||||||
|
default:
|
||||||
|
existedSI := objectSDK.NewSplitInfo()
|
||||||
|
if err := existedSI.Unmarshal(existed); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
si = util.MergeSplitInfo(si, existedSI)
|
||||||
|
siBytes, err := si.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b.Set(key, siBytes, pebble.Sync)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// splitInfoFromObject returns split info based on last or linkin object.
|
// splitInfoFromObject returns split info based on last or linkin object.
|
||||||
|
|
Loading…
Reference in a new issue