Dmitrii Stepanov
15b4288d80
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 2m32s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m24s
Tests and linters / Run gofumpt (pull_request) Successful in 3m27s
DCO action / DCO (pull_request) Successful in 3m45s
Tests and linters / gopls check (pull_request) Successful in 4m3s
Build / Build Components (pull_request) Successful in 4m23s
Tests and linters / Staticcheck (pull_request) Successful in 4m52s
Tests and linters / Tests (pull_request) Successful in 5m29s
Tests and linters / Lint (pull_request) Successful in 5m44s
Tests and linters / Tests with -race (pull_request) Successful in 6m40s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
661 lines
17 KiB
Go
661 lines
17 KiB
Go
package meta
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
gio "io"
|
|
"strconv"
|
|
"time"
|
|
|
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
storagelog "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/log"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
type (
|
|
namedBucketItem struct {
|
|
name, key, val []byte
|
|
}
|
|
)
|
|
|
|
// PutPrm groups the parameters of Put operation.
|
|
type PutPrm struct {
|
|
obj *objectSDK.Object
|
|
|
|
id []byte
|
|
|
|
indexAttributes bool
|
|
}
|
|
|
|
// PutRes groups the resulting values of Put operation.
|
|
type PutRes struct {
|
|
Inserted bool
|
|
}
|
|
|
|
// SetObject is a Put option to set object to save.
|
|
func (p *PutPrm) SetObject(obj *objectSDK.Object) {
|
|
p.obj = obj
|
|
}
|
|
|
|
// SetStorageID is a Put option to set storage ID to save.
|
|
func (p *PutPrm) SetStorageID(id []byte) {
|
|
p.id = id
|
|
}
|
|
|
|
func (p *PutPrm) SetIndexAttributes(v bool) {
|
|
p.indexAttributes = v
|
|
}
|
|
|
|
var (
|
|
ErrUnknownObjectType = errors.New("unknown object type")
|
|
ErrIncorrectRootObject = errors.New("invalid root object")
|
|
)
|
|
|
|
// Put saves object header in metabase. Object payload expected to be cut.
|
|
//
|
|
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
|
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
|
func (db *DB) Put(ctx context.Context, prm PutPrm) (res PutRes, err error) {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("Put", time.Since(startedAt), success)
|
|
}()
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.Put",
|
|
trace.WithAttributes(
|
|
attribute.String("address", objectCore.AddressOf(prm.obj).EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
} else if db.mode.ReadOnly() {
|
|
return res, ErrReadOnlyMode
|
|
}
|
|
|
|
currEpoch := db.epochState.CurrentEpoch()
|
|
|
|
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
|
var e error
|
|
res, e = db.put(tx, prm.obj, prm.id, nil, currEpoch, prm.indexAttributes)
|
|
return e
|
|
})
|
|
if err == nil {
|
|
success = true
|
|
storagelog.Write(ctx, db.log,
|
|
storagelog.AddressField(objectCore.AddressOf(prm.obj)),
|
|
storagelog.OpField("metabase PUT"))
|
|
}
|
|
|
|
return res, metaerr.Wrap(err)
|
|
}
|
|
|
|
func (db *DB) put(tx *bbolt.Tx,
|
|
obj *objectSDK.Object,
|
|
id []byte,
|
|
si *objectSDK.SplitInfo,
|
|
currEpoch uint64,
|
|
indexAttributes bool,
|
|
) (PutRes, error) {
|
|
cnr, ok := obj.ContainerID()
|
|
if !ok {
|
|
return PutRes{}, errors.New("missing container in object")
|
|
}
|
|
|
|
var ecParentAddress oid.Address
|
|
if ecHeader := obj.ECHeader(); ecHeader != nil {
|
|
ecParentAddress.SetContainer(cnr)
|
|
ecParentAddress.SetObject(ecHeader.Parent())
|
|
}
|
|
|
|
isParent := si != nil
|
|
|
|
exists, _, err := db.exists(tx, objectCore.AddressOf(obj), ecParentAddress, currEpoch)
|
|
|
|
var splitInfoError *objectSDK.SplitInfoError
|
|
if errors.As(err, &splitInfoError) {
|
|
exists = true // object exists, however it is virtual
|
|
} else if err != nil {
|
|
return PutRes{}, err // return any error besides SplitInfoError
|
|
}
|
|
|
|
if exists {
|
|
return PutRes{}, db.updateObj(tx, obj, id, si, isParent)
|
|
}
|
|
|
|
return PutRes{Inserted: true}, db.insertObject(tx, obj, id, si, isParent, cnr, currEpoch, indexAttributes)
|
|
}
|
|
|
|
func (db *DB) updateObj(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool) error {
|
|
// most right child and split header overlap parent so we have to
|
|
// check if object exists to not overwrite it twice
|
|
|
|
// When storage engine moves objects between different sub-storages,
|
|
// it calls metabase.Put method with new storage ID, thus triggering this code.
|
|
if !isParent && id != nil {
|
|
return setStorageID(tx, objectCore.AddressOf(obj), id, true)
|
|
}
|
|
|
|
// when storage already has last object in split hierarchy and there is
|
|
// a linking object to put (or vice versa), we should update split info
|
|
// with object ids of these objects
|
|
if isParent {
|
|
return updateSplitInfo(tx, objectCore.AddressOf(obj), si)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *objectSDK.SplitInfo, isParent bool, cnr cid.ID, currEpoch uint64, indexAttributes bool) error {
|
|
if par := obj.Parent(); par != nil && !isParent { // limit depth by two
|
|
parentSI, err := splitInfoFromObject(obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = db.put(tx, par, id, parentSI, currEpoch, indexAttributes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err := putUniqueIndexes(tx, obj, si, id)
|
|
if err != nil {
|
|
return fmt.Errorf("can't put unique indexes: %w", err)
|
|
}
|
|
|
|
err = updateListIndexes(tx, obj, putListIndexItem)
|
|
if err != nil {
|
|
return fmt.Errorf("can't put list indexes: %w", err)
|
|
}
|
|
|
|
if indexAttributes {
|
|
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
|
if err != nil {
|
|
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
|
}
|
|
}
|
|
|
|
// update container volume size estimation
|
|
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
|
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !isParent {
|
|
if err = db.incCounters(tx, cnr, IsUserObject(obj)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
|
|
isParent := si != nil
|
|
addr := objectCore.AddressOf(obj)
|
|
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
|
|
|
bucketName := make([]byte, bucketKeySize)
|
|
if !isParent {
|
|
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if id != nil {
|
|
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
|
|
}
|
|
|
|
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
|
|
switch obj.Type() {
|
|
case objectSDK.TypeRegular:
|
|
bucketName = primaryBucketName(addr.Container(), bucketName)
|
|
case objectSDK.TypeTombstone:
|
|
bucketName = tombstoneBucketName(addr.Container(), bucketName)
|
|
case objectSDK.TypeLock:
|
|
bucketName = bucketNameLockers(addr.Container(), bucketName)
|
|
default:
|
|
return ErrUnknownObjectType
|
|
}
|
|
rawObject, err := obj.CutPayload().Marshal()
|
|
if err != nil {
|
|
return fmt.Errorf("can't marshal object header: %w", err)
|
|
}
|
|
return putUniqueIndexItem(tx, namedBucketItem{
|
|
name: bucketName,
|
|
key: objKey,
|
|
val: rawObject,
|
|
})
|
|
}
|
|
|
|
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
|
|
if expEpoch, ok := hasExpirationEpoch(obj); ok {
|
|
err := putUniqueIndexItem(tx, namedBucketItem{
|
|
name: expEpochToObjectBucketName,
|
|
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
|
|
val: zeroValue,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
val := make([]byte, epochSize)
|
|
binary.LittleEndian.PutUint64(val, expEpoch)
|
|
err = putUniqueIndexItem(tx, namedBucketItem{
|
|
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
|
|
key: objKey,
|
|
val: val,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
|
|
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
|
|
if ecHead := obj.ECHeader(); ecHead != nil {
|
|
parentID := ecHead.Parent()
|
|
if ecHead.ParentSplitID() != nil {
|
|
parentSplitParentID := ecHead.ParentSplitParentID()
|
|
if parentSplitParentID == nil {
|
|
return nil
|
|
}
|
|
|
|
si = objectSDK.NewSplitInfo()
|
|
si.SetSplitID(ecHead.ParentSplitID())
|
|
si.SetLastPart(ecHead.Parent())
|
|
|
|
parentID = *parentSplitParentID
|
|
}
|
|
objKey = objectKey(parentID, objKey)
|
|
}
|
|
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateSplitInfoIndex(tx *bbolt.Tx, objKey []byte, cnr cid.ID, bucketName []byte, si *objectSDK.SplitInfo) error {
|
|
return updateUniqueIndexItem(tx, namedBucketItem{
|
|
name: rootBucketName(cnr, bucketName),
|
|
key: objKey,
|
|
}, func(old, _ []byte) ([]byte, error) {
|
|
switch {
|
|
case si == nil && old == nil:
|
|
return []byte{}, nil
|
|
case si == nil:
|
|
return old, nil
|
|
case old == nil:
|
|
return si.Marshal()
|
|
default:
|
|
oldSI := objectSDK.NewSplitInfo()
|
|
if err := oldSI.Unmarshal(bytes.Clone(old)); err != nil {
|
|
return nil, err
|
|
}
|
|
si = util.MergeSplitInfo(si, oldSI)
|
|
return si.Marshal()
|
|
}
|
|
})
|
|
}
|
|
|
|
type updateIndexItemFunc = func(tx *bbolt.Tx, item namedBucketItem) error
|
|
|
|
func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
|
idObj, _ := obj.ID()
|
|
cnr, _ := obj.ContainerID()
|
|
objKey := objectKey(idObj, make([]byte, objectKeySize))
|
|
bucketName := make([]byte, bucketKeySize)
|
|
|
|
idParent, ok := obj.ParentID()
|
|
|
|
// index parent ids
|
|
if ok {
|
|
err := f(tx, namedBucketItem{
|
|
name: parentBucketName(cnr, bucketName),
|
|
key: objectKey(idParent, make([]byte, objectKeySize)),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// index split ids
|
|
if obj.SplitID() != nil {
|
|
err := f(tx, namedBucketItem{
|
|
name: splitBucketName(cnr, bucketName),
|
|
key: obj.SplitID().ToV2(),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if ech := obj.ECHeader(); ech != nil {
|
|
err := f(tx, namedBucketItem{
|
|
name: ecInfoBucketName(cnr, bucketName),
|
|
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ech.ParentSplitID() != nil {
|
|
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
|
err := f(tx, namedBucketItem{
|
|
name: splitBucketName(cnr, bucketName),
|
|
key: ech.ParentSplitID().ToV2(),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if parentSplitParentID := ech.ParentSplitParentID(); parentSplitParentID != nil {
|
|
objKey := objectKey(ech.Parent(), make([]byte, objectKeySize))
|
|
err := f(tx, namedBucketItem{
|
|
name: parentBucketName(cnr, bucketName),
|
|
key: objectKey(*parentSplitParentID, make([]byte, objectKeySize)),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var indexedAttributes = map[string]struct{}{
|
|
"S3-Access-Box-CRDT-Name": {},
|
|
objectSDK.AttributeFilePath: {},
|
|
}
|
|
|
|
// IsAtrributeIndexed returns True if attribute is indexed by metabase.
|
|
func IsAtrributeIndexed(attr string) bool {
|
|
_, found := indexedAttributes[attr]
|
|
return found
|
|
}
|
|
|
|
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
|
id, _ := obj.ID()
|
|
cnr, _ := obj.ContainerID()
|
|
objKey := objectKey(id, make([]byte, objectKeySize))
|
|
|
|
key := make([]byte, bucketKeySize)
|
|
var attrs []objectSDK.Attribute
|
|
if obj.ECHeader() != nil {
|
|
attrs = obj.ECHeader().ParentAttributes()
|
|
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
|
|
} else {
|
|
attrs = obj.Attributes()
|
|
}
|
|
|
|
// user specified attributes
|
|
for i := range attrs {
|
|
if !IsAtrributeIndexed(attrs[i].Key()) {
|
|
continue
|
|
}
|
|
key = attributeBucketName(cnr, attrs[i].Key(), key)
|
|
err := f(tx, namedBucketItem{
|
|
name: key,
|
|
key: []byte(attrs[i].Value()),
|
|
val: objKey,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
|
attributes := obj.Attributes()
|
|
if ech := obj.ECHeader(); ech != nil {
|
|
attributes = ech.ParentAttributes()
|
|
}
|
|
for _, attr := range attributes {
|
|
if attr.Key() == objectV2.SysAttributeExpEpoch {
|
|
expEpoch, err := strconv.ParseUint(attr.Value(), 10, 64)
|
|
return expEpoch, err == nil
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
type bucketContainer interface {
|
|
Bucket([]byte) *bbolt.Bucket
|
|
CreateBucket([]byte) (*bbolt.Bucket, error)
|
|
CreateBucketIfNotExists([]byte) (*bbolt.Bucket, error)
|
|
}
|
|
|
|
func createBucketLikelyExists[T bucketContainer](tx T, name []byte) (*bbolt.Bucket, error) {
|
|
if bkt := tx.Bucket(name); bkt != nil {
|
|
return bkt, nil
|
|
}
|
|
return tx.CreateBucket(name)
|
|
}
|
|
|
|
func updateUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem, update func(oldData, newData []byte) ([]byte, error)) error {
|
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
data, err := update(bkt.Get(item.key), item.val)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return bkt.Put(item.key, data)
|
|
}
|
|
|
|
func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
|
}
|
|
|
|
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
|
}
|
|
|
|
return fkbtRoot.Put(item.val, zeroValue)
|
|
}
|
|
|
|
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|
bkt, err := createBucketLikelyExists(tx, item.name)
|
|
if err != nil {
|
|
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
|
}
|
|
|
|
lst, err := decodeList(bkt.Get(item.key))
|
|
if err != nil {
|
|
return fmt.Errorf("can't decode leaf list %v: %w", item.key, err)
|
|
}
|
|
|
|
lst = append(lst, item.val)
|
|
|
|
encodedLst, err := encodeList(lst)
|
|
if err != nil {
|
|
return fmt.Errorf("can't encode leaf list %v: %w", item.key, err)
|
|
}
|
|
|
|
return bkt.Put(item.key, encodedLst)
|
|
}
|
|
|
|
// encodeList decodes list of bytes into a single blog for list bucket indexes.
|
|
func encodeList(lst [][]byte) ([]byte, error) {
|
|
w := io.NewBufBinWriter()
|
|
w.WriteVarUint(uint64(len(lst)))
|
|
for i := range lst {
|
|
w.WriteVarBytes(lst[i])
|
|
}
|
|
if w.Err != nil {
|
|
return nil, w.Err
|
|
}
|
|
return w.Bytes(), nil
|
|
}
|
|
|
|
// decodeList decodes blob into the list of bytes from list bucket index.
|
|
func decodeList(data []byte) (lst [][]byte, err error) {
|
|
if len(data) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var offset uint64
|
|
size, n, err := getVarUint(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
offset += uint64(n)
|
|
lst = make([][]byte, size, size+1)
|
|
for i := range lst {
|
|
sz, n, err := getVarUint(data[offset:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
offset += uint64(n)
|
|
|
|
next := offset + sz
|
|
if uint64(len(data)) < next {
|
|
return nil, gio.ErrUnexpectedEOF
|
|
}
|
|
lst[i] = data[offset:next]
|
|
offset = next
|
|
}
|
|
return lst, nil
|
|
}
|
|
|
|
func getVarUint(data []byte) (uint64, int, error) {
|
|
if len(data) == 0 {
|
|
return 0, 0, gio.ErrUnexpectedEOF
|
|
}
|
|
|
|
switch b := data[0]; b {
|
|
case 0xfd:
|
|
if len(data) < 3 {
|
|
return 0, 1, gio.ErrUnexpectedEOF
|
|
}
|
|
return uint64(binary.LittleEndian.Uint16(data[1:])), 3, nil
|
|
case 0xfe:
|
|
if len(data) < 5 {
|
|
return 0, 1, gio.ErrUnexpectedEOF
|
|
}
|
|
return uint64(binary.LittleEndian.Uint32(data[1:])), 5, nil
|
|
case 0xff:
|
|
if len(data) < 9 {
|
|
return 0, 1, gio.ErrUnexpectedEOF
|
|
}
|
|
return binary.LittleEndian.Uint64(data[1:]), 9, nil
|
|
default:
|
|
return uint64(b), 1, nil
|
|
}
|
|
}
|
|
|
|
// setStorageID for existing objects if they were moved from one
|
|
// storage location to another.
|
|
func setStorageID(tx *bbolt.Tx, addr oid.Address, id []byte, override bool) error {
|
|
key := make([]byte, bucketKeySize)
|
|
bkt, err := createBucketLikelyExists(tx, smallBucketName(addr.Container(), key))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key = objectKey(addr.Object(), key)
|
|
if override || bkt.Get(key) == nil {
|
|
return bkt.Put(key, id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateSpliInfo for existing objects if storage filled with extra information
|
|
// about last object in split hierarchy or linking object.
|
|
func updateSplitInfo(tx *bbolt.Tx, addr oid.Address, from *objectSDK.SplitInfo) error {
|
|
objKey := objectKey(addr.Object(), make([]byte, bucketKeySize))
|
|
return updateSplitInfoIndex(tx, objKey, addr.Container(), make([]byte, bucketKeySize), from)
|
|
}
|
|
|
|
// splitInfoFromObject returns split info based on last or linkin object.
|
|
// Otherwise returns nil, nil.
|
|
func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) {
|
|
if obj.Parent() == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
info := objectSDK.NewSplitInfo()
|
|
info.SetSplitID(obj.SplitID())
|
|
|
|
switch {
|
|
case isLinkObject(obj):
|
|
id, ok := obj.ID()
|
|
if !ok {
|
|
return nil, errors.New("missing object ID")
|
|
}
|
|
|
|
info.SetLink(id)
|
|
case isLastObject(obj):
|
|
id, ok := obj.ID()
|
|
if !ok {
|
|
return nil, errors.New("missing object ID")
|
|
}
|
|
|
|
info.SetLastPart(id)
|
|
default:
|
|
return nil, ErrIncorrectRootObject // should never happen
|
|
}
|
|
|
|
return info, nil
|
|
}
|
|
|
|
// isLinkObject returns true if object contains parent header and list
|
|
// of children.
|
|
func isLinkObject(obj *objectSDK.Object) bool {
|
|
return len(obj.Children()) > 0 && obj.Parent() != nil
|
|
}
|
|
|
|
// isLastObject returns true if object contains only parent header without list
|
|
// of children.
|
|
func isLastObject(obj *objectSDK.Object) bool {
|
|
return len(obj.Children()) == 0 && obj.Parent() != nil
|
|
}
|