[#1323] metabase: Drop user attribute index

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-20 12:59:59 +03:00
parent a9e5654a04
commit 1bb45d4ac3
4 changed files with 38 additions and 193 deletions

View file

@ -341,11 +341,6 @@ func (db *DB) deleteObject(
return fmt.Errorf("can't remove list indexes: %w", err)
}
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
if err != nil {
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
}
if isParent {
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
garbageBKT := tx.Bucket(garbageBucketName)
@ -386,21 +381,6 @@ func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
}
}
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt := tx.Bucket(item.name)
if bkt == nil {
return nil
}
fkbtRoot := bkt.Bucket(item.key)
if fkbtRoot == nil {
return nil
}
_ = fkbtRoot.Delete(item.val) // ignore error, best effort there
return nil
}
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt := tx.Bucket(item.name)
if bkt == nil {

View file

@ -175,11 +175,6 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
return fmt.Errorf("can't put list indexes: %w", err)
}
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
if err != nil {
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
}
// update container volume size estimation
if obj.Type() == objectSDK.TypeRegular && !isParent {
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
@ -404,36 +399,6 @@ func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
return 0, false
}
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
id, _ := obj.ID()
cnr, _ := obj.ContainerID()
objKey := objectKey(id, make([]byte, objectKeySize))
key := make([]byte, bucketKeySize)
var attrs []objectSDK.Attribute
if obj.ECHeader() != nil {
attrs = obj.ECHeader().ParentAttributes()
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
} else {
attrs = obj.Attributes()
}
// user specified attributes
for i := range attrs {
key = attributeBucketName(cnr, attrs[i].Key(), key)
err := f(tx, namedBucketItem{
name: key,
key: []byte(attrs[i].Value()),
val: objKey,
})
if err != nil {
return err
}
}
return nil
}
type bucketContainer interface {
Bucket([]byte) *bbolt.Bucket
CreateBucket([]byte) (*bbolt.Bucket, error)
@ -464,20 +429,6 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
}
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {
return fmt.Errorf("can't create index %v: %w", item.name, err)
}
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
if err != nil {
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
}
return fkbtRoot.Put(item.val, zeroValue)
}
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
bkt, err := createBucketLikelyExists(tx, item.name)
if err != nil {

View file

@ -198,9 +198,6 @@ func (db *DB) selectFastFilter(
switch f.Header() {
case v2object.FilterHeaderObjectID:
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
case v2object.FilterHeaderOwnerID,
v2object.FilterHeaderPayloadHash:
return // moved to slow filters
case v2object.FilterHeaderObjectType:
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
selectAllFromBucket(tx, bucketName, to, fNum)
@ -220,14 +217,7 @@ func (db *DB) selectFastFilter(
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
default: // user attribute
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
if f.Operation() == objectSDK.MatchNotPresent {
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
} else {
db.selectFromFKBT(tx, bucketName, f, to, fNum)
}
default:
}
}
@ -237,16 +227,6 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
v2object.TypeLock.String(): {bucketNameLockers},
}
func allBucketNames(cnr cid.ID) (names [][]byte) {
for _, fns := range mBucketNaming {
for _, fn := range fns {
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
}
}
return
}
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
appendNames := func(key string) {
fns, ok := mBucketNaming[key]
@ -278,83 +258,6 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str
return
}
// selectFromList looks into <fkbt> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromFKBT(
tx *bbolt.Tx,
name []byte, // fkbt root bucket name
f objectSDK.SearchFilter, // filter for operation and value
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
matchFunc, ok := db.matchers[f.Operation()]
if !ok {
db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation())))
return
}
fkbtRoot := tx.Bucket(name)
if fkbtRoot == nil {
return
}
err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
fkbtLeaf := fkbtRoot.Bucket(k)
if fkbtLeaf == nil {
return nil
}
return fkbtLeaf.ForEach(func(k, _ []byte) error {
markAddressInCache(to, fNum, string(k))
return nil
})
})
if err != nil {
db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
}
}
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
// resulting cache.
func selectOutsideFKBT(
tx *bbolt.Tx,
incl [][]byte, // buckets
name []byte, // fkbt root bucket name
to map[string]int, // resulting cache
fNum int, // index of filter
) {
mExcl := make(map[string]struct{})
bktExcl := tx.Bucket(name)
if bktExcl != nil {
_ = bktExcl.ForEachBucket(func(k []byte) error {
exclBktLeaf := bktExcl.Bucket(k)
return exclBktLeaf.ForEach(func(k, _ []byte) error {
mExcl[string(k)] = struct{}{}
return nil
})
})
}
for i := range incl {
bktIncl := tx.Bucket(incl[i])
if bktIncl == nil {
continue
}
_ = bktIncl.ForEach(func(k, _ []byte) error {
if _, ok := mExcl[string(k)]; !ok {
markAddressInCache(to, fNum, string(k))
}
return nil
})
}
}
// selectFromList looks into <list> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromList(
@ -491,13 +394,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
}
for i := range f {
matchFunc, ok := db.matchers[f[i].Operation()]
if !ok {
return false
}
var data []byte
switch f[i].Header() {
case v2object.FilterHeaderVersion:
data = []byte(obj.Version().String())
@ -515,8 +412,18 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
case v2object.FilterHeaderPayloadHash:
cs, _ := obj.PayloadChecksum()
data = cs.Value()
default:
continue // ignore unknown search attributes
default: // user attribute
v, ok := attributeValue(obj, f[i].Header())
if ok {
data = []byte(v)
} else {
return f[i].Operation() == objectSDK.MatchNotPresent
}
}
matchFunc, ok := db.matchers[f[i].Operation()]
if !ok {
return false
}
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
@ -527,6 +434,19 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
return true
}
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
objectAttributes := obj.Attributes()
if ech := obj.ECHeader(); ech != nil {
objectAttributes = ech.ParentAttributes()
}
for _, attr := range objectAttributes {
if attr.Key() == attribute {
return attr.Value(), true
}
}
return "", false
}
// groupFilters divides filters in two groups: fast and slow. Fast filters
// processed by indexes and slow filters processed after by unmarshaling
// object headers.
@ -545,16 +465,17 @@ func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
}
res.withCnrFilter = true
case // slow filters
v2object.FilterHeaderVersion,
v2object.FilterHeaderCreationEpoch,
v2object.FilterHeaderPayloadLength,
v2object.FilterHeaderHomomorphicHash,
v2object.FilterHeaderOwnerID,
v2object.FilterHeaderPayloadHash:
res.slowFilters = append(res.slowFilters, filters[i])
default: // fast filters or user attributes if unknown
case // fast filters
v2object.FilterHeaderObjectID,
v2object.FilterHeaderObjectType,
v2object.FilterHeaderParent,
v2object.FilterHeaderSplitID,
v2object.FilterHeaderECParent,
v2object.FilterPropertyRoot,
v2object.FilterPropertyPhy:
res.fastFilters = append(res.fastFilters, filters[i])
default:
res.slowFilters = append(res.slowFilters, filters[i])
}
}

View file

@ -95,10 +95,10 @@ const (
// Key: owner ID
// Value: bucket containing object IDs as keys
_
// userAttributePrefix is used for prefixing FKBT index buckets containing objects.
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
// Key: attribute value
// Value: bucket containing object IDs as keys
userAttributePrefix
_
// ====================
// List index buckets.
@ -167,13 +167,6 @@ func smallBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, smallPrefix, key)
}
// attributeBucketName returns <CID>_attr_<attributeKey>.
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
key[0] = userAttributePrefix
cnr.Encode(key[1:])
return append(key[:bucketKeySize], attributeKey...)
}
// rootBucketName returns <CID>_root.
func rootBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, rootPrefix, key)