forked from TrueCloudLab/frostfs-node
[#1323] metabase: Drop user attribute index
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
0f08a2efba
commit
7d0d781db1
4 changed files with 38 additions and 193 deletions
|
@ -341,11 +341,6 @@ func (db *DB) deleteObject(
|
|||
return fmt.Errorf("can't remove list indexes: %w", err)
|
||||
}
|
||||
|
||||
err = updateFKBTIndexes(tx, obj, delFKBTIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
||||
}
|
||||
|
||||
if isParent {
|
||||
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
|
@ -386,21 +381,6 @@ func delUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) {
|
|||
}
|
||||
}
|
||||
|
||||
func delFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt := tx.Bucket(item.name)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
fkbtRoot := bkt.Bucket(item.key)
|
||||
if fkbtRoot == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = fkbtRoot.Delete(item.val) // ignore error, best effort there
|
||||
return nil
|
||||
}
|
||||
|
||||
func delListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt := tx.Bucket(item.name)
|
||||
if bkt == nil {
|
||||
|
|
|
@ -175,11 +175,6 @@ func (db *DB) insertObject(tx *bbolt.Tx, obj *objectSDK.Object, id []byte, si *o
|
|||
return fmt.Errorf("can't put list indexes: %w", err)
|
||||
}
|
||||
|
||||
err = updateFKBTIndexes(tx, obj, putFKBTIndexItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't put fake bucket tree indexes: %w", err)
|
||||
}
|
||||
|
||||
// update container volume size estimation
|
||||
if obj.Type() == objectSDK.TypeRegular && !isParent {
|
||||
err = changeContainerSize(tx, cnr, obj.PayloadSize(), true)
|
||||
|
@ -404,36 +399,6 @@ func hasExpirationEpoch(obj *objectSDK.Object) (uint64, bool) {
|
|||
return 0, false
|
||||
}
|
||||
|
||||
func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFunc) error {
|
||||
id, _ := obj.ID()
|
||||
cnr, _ := obj.ContainerID()
|
||||
objKey := objectKey(id, make([]byte, objectKeySize))
|
||||
|
||||
key := make([]byte, bucketKeySize)
|
||||
var attrs []objectSDK.Attribute
|
||||
if obj.ECHeader() != nil {
|
||||
attrs = obj.ECHeader().ParentAttributes()
|
||||
objKey = objectKey(obj.ECHeader().Parent(), make([]byte, objectKeySize))
|
||||
} else {
|
||||
attrs = obj.Attributes()
|
||||
}
|
||||
|
||||
// user specified attributes
|
||||
for i := range attrs {
|
||||
key = attributeBucketName(cnr, attrs[i].Key(), key)
|
||||
err := f(tx, namedBucketItem{
|
||||
name: key,
|
||||
key: []byte(attrs[i].Value()),
|
||||
val: objKey,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type bucketContainer interface {
|
||||
Bucket([]byte) *bbolt.Bucket
|
||||
CreateBucket([]byte) (*bbolt.Bucket, error)
|
||||
|
@ -464,20 +429,6 @@ func putUniqueIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
|||
return updateUniqueIndexItem(tx, item, func(_, val []byte) ([]byte, error) { return val, nil })
|
||||
}
|
||||
|
||||
func putFKBTIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create index %v: %w", item.name, err)
|
||||
}
|
||||
|
||||
fkbtRoot, err := createBucketLikelyExists(bkt, item.key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't create fake bucket tree index %v: %w", item.key, err)
|
||||
}
|
||||
|
||||
return fkbtRoot.Put(item.val, zeroValue)
|
||||
}
|
||||
|
||||
func putListIndexItem(tx *bbolt.Tx, item namedBucketItem) error {
|
||||
bkt, err := createBucketLikelyExists(tx, item.name)
|
||||
if err != nil {
|
||||
|
|
|
@ -198,9 +198,6 @@ func (db *DB) selectFastFilter(
|
|||
switch f.Header() {
|
||||
case v2object.FilterHeaderObjectID:
|
||||
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
||||
case v2object.FilterHeaderOwnerID,
|
||||
v2object.FilterHeaderPayloadHash:
|
||||
return // moved to slow filters
|
||||
case v2object.FilterHeaderObjectType:
|
||||
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
||||
selectAllFromBucket(tx, bucketName, to, fNum)
|
||||
|
@ -220,14 +217,7 @@ func (db *DB) selectFastFilter(
|
|||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||
default: // user attribute
|
||||
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
||||
|
||||
if f.Operation() == objectSDK.MatchNotPresent {
|
||||
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
|
||||
} else {
|
||||
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,16 +227,6 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
|
|||
v2object.TypeLock.String(): {bucketNameLockers},
|
||||
}
|
||||
|
||||
func allBucketNames(cnr cid.ID) (names [][]byte) {
|
||||
for _, fns := range mBucketNaming {
|
||||
for _, fn := range fns {
|
||||
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
|
||||
appendNames := func(key string) {
|
||||
fns, ok := mBucketNaming[key]
|
||||
|
@ -278,83 +258,6 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str
|
|||
return
|
||||
}
|
||||
|
||||
// selectFromList looks into <fkbt> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromFKBT(
|
||||
tx *bbolt.Tx,
|
||||
name []byte, // fkbt root bucket name
|
||||
f objectSDK.SearchFilter, // filter for operation and value
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) { //
|
||||
matchFunc, ok := db.matchers[f.Operation()]
|
||||
if !ok {
|
||||
db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation())))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
fkbtRoot := tx.Bucket(name)
|
||||
if fkbtRoot == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
|
||||
fkbtLeaf := fkbtRoot.Bucket(k)
|
||||
if fkbtLeaf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fkbtLeaf.ForEach(func(k, _ []byte) error {
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
|
||||
// resulting cache.
|
||||
func selectOutsideFKBT(
|
||||
tx *bbolt.Tx,
|
||||
incl [][]byte, // buckets
|
||||
name []byte, // fkbt root bucket name
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
mExcl := make(map[string]struct{})
|
||||
|
||||
bktExcl := tx.Bucket(name)
|
||||
if bktExcl != nil {
|
||||
_ = bktExcl.ForEachBucket(func(k []byte) error {
|
||||
exclBktLeaf := bktExcl.Bucket(k)
|
||||
return exclBktLeaf.ForEach(func(k, _ []byte) error {
|
||||
mExcl[string(k)] = struct{}{}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
for i := range incl {
|
||||
bktIncl := tx.Bucket(incl[i])
|
||||
if bktIncl == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
_ = bktIncl.ForEach(func(k, _ []byte) error {
|
||||
if _, ok := mExcl[string(k)]; !ok {
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// selectFromList looks into <list> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromList(
|
||||
|
@ -491,13 +394,7 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
}
|
||||
|
||||
for i := range f {
|
||||
matchFunc, ok := db.matchers[f[i].Operation()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
var data []byte
|
||||
|
||||
switch f[i].Header() {
|
||||
case v2object.FilterHeaderVersion:
|
||||
data = []byte(obj.Version().String())
|
||||
|
@ -515,8 +412,18 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
case v2object.FilterHeaderPayloadHash:
|
||||
cs, _ := obj.PayloadChecksum()
|
||||
data = cs.Value()
|
||||
default:
|
||||
continue // ignore unknown search attributes
|
||||
default: // user attribute
|
||||
v, ok := attributeValue(obj, f[i].Header())
|
||||
if ok {
|
||||
data = []byte(v)
|
||||
} else {
|
||||
return f[i].Operation() == objectSDK.MatchNotPresent
|
||||
}
|
||||
}
|
||||
|
||||
matchFunc, ok := db.matchers[f[i].Operation()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
|
||||
|
@ -527,6 +434,19 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
|||
return true
|
||||
}
|
||||
|
||||
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||
objectAttributes := obj.Attributes()
|
||||
if ech := obj.ECHeader(); ech != nil {
|
||||
objectAttributes = ech.ParentAttributes()
|
||||
}
|
||||
for _, attr := range objectAttributes {
|
||||
if attr.Key() == attribute {
|
||||
return attr.Value(), true
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// groupFilters divides filters in two groups: fast and slow. Fast filters
|
||||
// processed by indexes and slow filters processed after by unmarshaling
|
||||
// object headers.
|
||||
|
@ -545,16 +465,17 @@ func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
|
|||
}
|
||||
|
||||
res.withCnrFilter = true
|
||||
case // slow filters
|
||||
v2object.FilterHeaderVersion,
|
||||
v2object.FilterHeaderCreationEpoch,
|
||||
v2object.FilterHeaderPayloadLength,
|
||||
v2object.FilterHeaderHomomorphicHash,
|
||||
v2object.FilterHeaderOwnerID,
|
||||
v2object.FilterHeaderPayloadHash:
|
||||
res.slowFilters = append(res.slowFilters, filters[i])
|
||||
default: // fast filters or user attributes if unknown
|
||||
case // fast filters
|
||||
v2object.FilterHeaderObjectID,
|
||||
v2object.FilterHeaderObjectType,
|
||||
v2object.FilterHeaderParent,
|
||||
v2object.FilterHeaderSplitID,
|
||||
v2object.FilterHeaderECParent,
|
||||
v2object.FilterPropertyRoot,
|
||||
v2object.FilterPropertyPhy:
|
||||
res.fastFilters = append(res.fastFilters, filters[i])
|
||||
default:
|
||||
res.slowFilters = append(res.slowFilters, filters[i])
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -95,10 +95,10 @@ const (
|
|||
// Key: owner ID
|
||||
// Value: bucket containing object IDs as keys
|
||||
_
|
||||
// userAttributePrefix is used for prefixing FKBT index buckets containing objects.
|
||||
// userAttributePrefix was used for prefixing FKBT index buckets containing objects.
|
||||
// Key: attribute value
|
||||
// Value: bucket containing object IDs as keys
|
||||
userAttributePrefix
|
||||
_
|
||||
|
||||
// ====================
|
||||
// List index buckets.
|
||||
|
@ -167,13 +167,6 @@ func smallBucketName(cnr cid.ID, key []byte) []byte {
|
|||
return bucketName(cnr, smallPrefix, key)
|
||||
}
|
||||
|
||||
// attributeBucketName returns <CID>_attr_<attributeKey>.
|
||||
func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte {
|
||||
key[0] = userAttributePrefix
|
||||
cnr.Encode(key[1:])
|
||||
return append(key[:bucketKeySize], attributeKey...)
|
||||
}
|
||||
|
||||
// rootBucketName returns <CID>_root.
|
||||
func rootBucketName(cnr cid.ID, key []byte) []byte {
|
||||
return bucketName(cnr, rootPrefix, key)
|
||||
|
|
Loading…
Reference in a new issue