[#1323] metabase: Drop ownerID index

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-19 17:13:39 +03:00 committed by Evgenii Stratonikov
parent 7bca428db0
commit 15dae8685e
4 changed files with 113 additions and 19 deletions

View file

@ -379,15 +379,6 @@ func updateFKBTIndexes(tx *bbolt.Tx, obj *objectSDK.Object, f updateIndexItemFun
objKey := objectKey(id, make([]byte, objectKeySize))
key := make([]byte, bucketKeySize)
err := f(tx, namedBucketItem{
name: ownerBucketName(cnr, key),
key: []byte(obj.OwnerID().EncodeToString()),
val: objKey,
})
if err != nil {
return err
}
var attrs []objectSDK.Attribute
if obj.ECHeader() != nil {
attrs = obj.ECHeader().ParentAttributes()

View file

@ -196,8 +196,7 @@ func (db *DB) selectFastFilter(
case v2object.FilterHeaderObjectID:
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
case v2object.FilterHeaderOwnerID:
bucketName := ownerBucketName(cnr, bucketName)
db.selectFromFKBT(tx, bucketName, f, to, fNum)
return // moved to slow filters
case v2object.FilterHeaderPayloadHash:
bucketName := payloadHashBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
@ -510,6 +509,8 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
case v2object.FilterHeaderPayloadLength:
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
case v2object.FilterHeaderOwnerID:
data = []byte(obj.OwnerID().EncodeToString())
default:
continue // ignore unknown search attributes
}
@ -544,7 +545,8 @@ func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
v2object.FilterHeaderVersion,
v2object.FilterHeaderCreationEpoch,
v2object.FilterHeaderPayloadLength,
v2object.FilterHeaderHomomorphicHash:
v2object.FilterHeaderHomomorphicHash,
v2object.FilterHeaderOwnerID:
res.slowFilters = append(res.slowFilters, filters[i])
default: // fast filters or user attributes if unknown
res.fastFilters = append(res.fastFilters, filters[i])

View file

@ -633,6 +633,112 @@ func TestDB_SelectObjectID(t *testing.T) {
})
}
func TestDB_SelectOwnerID(t *testing.T) {
t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
cnr := cidtest.ID()
// prepare
parent := testutil.GenerateObjectWithCID(cnr)
regular := testutil.GenerateObjectWithCID(cnr)
idParent, _ := parent.ID()
regular.SetParentID(idParent)
regular.SetParent(parent)
err := putBig(db, regular)
require.NoError(t, err)
ts := testutil.GenerateObjectWithCID(cnr)
ts.SetType(objectSDK.TypeTombstone)
err = putBig(db, ts)
require.NoError(t, err)
lock := testutil.GenerateObjectWithCID(cnr)
lock.SetType(objectSDK.TypeLock)
err = putBig(db, lock)
require.NoError(t, err)
t.Run("not found objects", func(t *testing.T) {
raw := testutil.GenerateObjectWithCID(cnr)
fs := objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, raw.OwnerID())
testSelect(t, db, cnr, fs)
fs = objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, raw.OwnerID())
testSelect(t, db, cnr, fs,
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
t.Run("regular objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, regular.OwnerID())
testSelect(t, db, cnr, fs, object.AddressOf(regular))
fs = objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, regular.OwnerID())
testSelect(t, db, cnr, fs,
object.AddressOf(parent),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
t.Run("tombstone objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, ts.OwnerID())
testSelect(t, db, cnr, fs, object.AddressOf(ts))
fs = objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, ts.OwnerID())
testSelect(t, db, cnr, fs,
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(lock),
)
})
t.Run("parent objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, parent.OwnerID())
testSelect(t, db, cnr, fs, object.AddressOf(parent))
fs = objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, parent.OwnerID())
testSelect(t, db, cnr, fs,
object.AddressOf(regular),
object.AddressOf(ts),
object.AddressOf(lock),
)
})
t.Run("lock objects", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringEqual, lock.OwnerID())
testSelect(t, db, cnr, fs, object.AddressOf(lock))
fs = objectSDK.SearchFilters{}
fs.AddObjectOwnerIDFilter(objectSDK.MatchStringNotEqual, lock.OwnerID())
testSelect(t, db, cnr, fs,
object.AddressOf(regular),
object.AddressOf(parent),
object.AddressOf(ts),
)
})
}
type testTarget struct {
objects []*objectSDK.Object
}

View file

@ -89,10 +89,10 @@ const (
// FKBT index buckets.
// ====================
// ownerPrefix is used for prefixing FKBT index buckets mapping owner to object IDs.
// ownerPrefix was used for prefixing FKBT index buckets mapping owner to object IDs.
// Key: owner ID
// Value: bucket containing object IDs as keys
ownerPrefix
_
// userAttributePrefix is used for prefixing FKBT index buckets containing objects.
// Key: attribute value
// Value: bucket containing object IDs as keys
@ -180,11 +180,6 @@ func rootBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, rootPrefix, key)
}
// ownerBucketName returns <CID>_ownerid.
func ownerBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, ownerPrefix, key)
}
// parentBucketName returns <CID>_parent.
func parentBucketName(cnr cid.ID, key []byte) []byte {
return bucketName(cnr, parentPrefix, key)