[#1158] metabase: Fix EC storage schema
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 3m5s
Build / Build Components (1.21) (pull_request) Successful in 4m0s
Build / Build Components (1.22) (pull_request) Successful in 3m56s
Tests and linters / Staticcheck (pull_request) Successful in 4m54s
Tests and linters / gopls check (pull_request) Successful in 4m55s
Tests and linters / Lint (pull_request) Successful in 5m54s
Pre-commit hooks / Pre-commit (pull_request) Successful in 6m29s
DCO action / DCO (pull_request) Successful in 3m34s
Tests and linters / Tests (1.22) (pull_request) Successful in 7m28s
Tests and linters / Tests (1.21) (pull_request) Successful in 7m50s
Tests and linters / Tests with -race (pull_request) Successful in 9m25s

Do not store EC info twice.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-06-04 13:30:30 +03:00
parent 643480d6fa
commit cc2449beaf
5 changed files with 7 additions and 92 deletions

View file

@ -473,46 +473,6 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
name: rootBucketName(cnr, bucketName),
key: objKey,
})
if obj.ECHeader() != nil {
err := delECInfo(tx, cnr, objKey, obj.ECHeader())
if err != nil {
return err
}
}
return nil
}
func delECInfo(tx *bbolt.Tx, cnr cid.ID, objKey []byte, ecHead *objectSDK.ECHeader) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) > 0 {
if bytes.Equal(val, objKey) {
delUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, bucketName),
key: parentID,
})
} else {
val = bytes.Clone(val)
offset := 0
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
val = append(val[:offset], val[offset+objectKeySize:]...)
break
}
offset += objectKeySize
}
err := putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
if err != nil {
return err
}
}
}
return nil
}

View file

@ -192,10 +192,12 @@ func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
}
func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
offset := 0
keys, err := decodeList(data)
if err != nil {
return err
}
ecInfo := objectSDK.NewECInfo()
for offset < len(data) {
key := data[offset : offset+objectKeySize]
for _, key := range keys {
// check in primary index
ojbData := getFromBucket(tx, primaryBucketName(cnr, make([]byte, bucketKeySize)), key)
if len(ojbData) != 0 {
@ -210,7 +212,6 @@ func getECInfoError(tx *bbolt.Tx, cnr cid.ID, data []byte) error {
chunk.Total = obj.ECHeader().Total()
ecInfo.AddChunk(chunk)
}
offset += objectKeySize
}
return logicerr.Wrap(objectSDK.NewECInfoError(ecInfo))
}

View file

@ -1,7 +1,6 @@
package meta
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -259,9 +258,6 @@ func putUniqueIndexes(
isObjKeySet := true
if ecHead := obj.ECHeader(); ecHead != nil {
if err = putECInfo(tx, cnr, objKey, ecHead); err != nil {
return err
}
objKey, isObjKeySet = objectKeyByECHeader(ecHead)
}
if isObjKeySet {
@ -344,7 +340,7 @@ func updateListIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
if ech := obj.ECHeader(); ech != nil {
err := f(tx, namedBucketItem{
name: ecParentToChunksBucketName(cnr, bucketName),
name: ecInfoBucketName(cnr, bucketName),
key: objectKey(ech.Parent(), make([]byte, objectKeySize)),
val: objKey,
})
@ -615,34 +611,3 @@ func isLinkObject(obj *objectSDK.Object) bool {
func isLastObject(obj *objectSDK.Object) bool {
return len(obj.Children()) == 0 && obj.Parent() != nil
}
func putECInfo(tx *bbolt.Tx,
cnr cid.ID, objKey []byte,
ecHead *objectSDK.ECHeader,
) error {
parentID := objectKey(ecHead.Parent(), make([]byte, objectKeySize))
bucketName := make([]byte, bucketKeySize)
val := getFromBucket(tx, ecInfoBucketName(cnr, bucketName), parentID)
if len(val) == 0 {
val = objKey
} else {
offset := 0
found := false
for offset < len(val) {
if bytes.Equal(objKey, val[offset:offset+objectKeySize]) {
found = true
break
}
offset += objectKeySize
}
if !found {
val = append(val, objKey...)
}
}
return putUniqueIndexItem(tx, namedBucketItem{
name: ecInfoBucketName(cnr, make([]byte, bucketKeySize)),
key: parentID,
val: val,
})
}

View file

@ -212,7 +212,7 @@ func (db *DB) selectFastFilter(
bucketName := splitBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
case v2object.FilterHeaderECParent:
bucketName := ecParentToChunksBucketName(cnr, bucketName)
bucketName := ecInfoBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
case v2object.FilterPropertyRoot:
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)

View file

@ -124,12 +124,6 @@ const (
// Key: container ID + type
// Value: Object id
ecInfoPrefix
// ecParentToChunksPrefix is used to store a relation between EC parent ID and chunks,
// but unlike for ecInfoPrefix the list of chunk IDs is encoded with encodeList.
// Key: EC parent ID
// Value: list of EC chunk IDs
ecParentToChunksPrefix
)
const (
@ -206,11 +200,6 @@ func ecInfoBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecInfoPrefix, key)
}
// ecParentToChunksBucketName returns <CID>_ecParentToChunks.
func ecParentToChunksBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ecParentToChunksPrefix, key)
}
// addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr oid.Address, key []byte) []byte {
addr.Container().Encode(key)