metabase: Drop search-only indexes #1323

Merged
fyrchik merged 6 commits from dstepanov-yadro/frostfs-node:feat/drop_metabase_index into master 2024-08-22 08:21:43 +00:00
Showing only changes of commit a9e5654a04 - Show all commits

View file

@ -197,46 +197,17 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
return nil
}
func putUniqueIndexes(
tx *bbolt.Tx,
obj *objectSDK.Object,
si *objectSDK.SplitInfo,
id []byte,
) error {
func putUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, si *objectSDK.SplitInfo, id []byte) error {
isParent := si != nil
addr := objectCore.AddressOf(obj)
cnr := addr.Container()
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
// add value to primary unique bucket
if !isParent {
switch obj.Type() {
case objectSDK.TypeRegular:
bucketName = primaryBucketName(cnr, bucketName)
case objectSDK.TypeTombstone:
bucketName = tombstoneBucketName(cnr, bucketName)
case objectSDK.TypeLock:
bucketName = bucketNameLockers(cnr, bucketName)
default:
return ErrUnknownObjectType
}
rawObject, err := obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("can't marshal object header: %w", err)
}
err = putUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
val: rawObject,
})
err := putRawObjectData(tx, obj, bucketName, addr, objKey)
if err != nil {
return err
}
// index storageID if it is present
if id != nil {
if err = setStorageID(tx, objectCore.AddressOf(obj), id, false); err != nil {
return err
@ -244,10 +215,40 @@ func putUniqueIndexes(
}
}
if err := putExpirationEpoch(tx, obj, addr, objKey); err != nil {
return err
}
return putSplitInfo(tx, obj, bucketName, addr, si, objKey)
}
func putRawObjectData(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, objKey []byte) error {
switch obj.Type() {
case objectSDK.TypeRegular:
bucketName = primaryBucketName(addr.Container(), bucketName)
case objectSDK.TypeTombstone:
bucketName = tombstoneBucketName(addr.Container(), bucketName)
case objectSDK.TypeLock:
bucketName = bucketNameLockers(addr.Container(), bucketName)
default:
return ErrUnknownObjectType
}
rawObject, err := obj.CutPayload().Marshal()
if err != nil {
return fmt.Errorf("can't marshal object header: %w", err)
}
return putUniqueIndexItem(tx, namedBucketItem{
name: bucketName,
key: objKey,
val: rawObject,
})
}
func putExpirationEpoch(tx *bbolt.Tx, obj *objectSDK.Object, addr oid.Address, objKey []byte) error {
if expEpoch, ok := hasExpirationEpoch(obj); ok {
err := putUniqueIndexItem(tx, namedBucketItem{
name: expEpochToObjectBucketName,
key: expirationEpochKey(expEpoch, cnr, addr.Object()),
key: expirationEpochKey(expEpoch, addr.Container(), addr.Object()),
val: zeroValue,
})
if err != nil {
@ -256,7 +257,7 @@ func putUniqueIndexes(
val := make([]byte, epochSize)
binary.LittleEndian.PutUint64(val, expEpoch)
err = putUniqueIndexItem(tx, namedBucketItem{
name: objectToExpirationEpochBucketName(cnr, make([]byte, bucketKeySize)),
name: objectToExpirationEpochBucketName(addr.Container(), make([]byte, bucketKeySize)),
key: objKey,
val: val,
})
@ -264,8 +265,10 @@ func putUniqueIndexes(
return err
}
}
return nil
}
// index root object
func putSplitInfo(tx *bbolt.Tx, obj *objectSDK.Object, bucketName []byte, addr oid.Address, si *objectSDK.SplitInfo, objKey []byte) error {
if obj.Type() == objectSDK.TypeRegular && !obj.HasParent() {
if ecHead := obj.ECHeader(); ecHead != nil {
parentID := ecHead.Parent()
@ -283,9 +286,8 @@ func putUniqueIndexes(
}
objKey = objectKey(parentID, objKey)
}
return updateSplitInfoIndex(tx, objKey, cnr, bucketName, si)
return updateSplitInfoIndex(tx, objKey, addr.Container(), bucketName, si)
}
return nil
}