[#199] Add SplitID index in metabase

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-11-30 11:46:18 +03:00
parent 74d44beb99
commit 337e2f6a57
5 changed files with 79 additions and 4 deletions

View file

@ -86,6 +86,15 @@ func bucketKeyHelper(hdr string, val string) []byte {
} }
return v return v
case v2object.FilterHeaderSplitID:
s := object.NewSplitID()
err := s.Parse(val)
if err != nil {
return nil
}
return s.ToV2()
default: default:
return []byte(val) return []byte(val)
} }

View file

@ -105,7 +105,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent
func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) { func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) {
addr := obj.Address() addr := obj.Address()
objKey := objectKey(addr.ObjectID()) objKey := objectKey(addr.ObjectID())
result := make([]namedBucketItem, 0, 2) result := make([]namedBucketItem, 0, 3)
// add value to primary unique bucket // add value to primary unique bucket
if !isParent { if !isParent {
@ -157,7 +157,7 @@ func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]nam
// builds list of <list> indexes from the object. // builds list of <list> indexes from the object.
func listIndexes(obj *object.Object) ([]namedBucketItem, error) { func listIndexes(obj *object.Object) ([]namedBucketItem, error) {
result := make([]namedBucketItem, 0, 1) result := make([]namedBucketItem, 0, 3)
addr := obj.Address() addr := obj.Address()
objKey := objectKey(addr.ObjectID()) objKey := objectKey(addr.ObjectID())
@ -168,6 +168,7 @@ func listIndexes(obj *object.Object) ([]namedBucketItem, error) {
val: objKey, val: objKey,
}) })
// index parent ids
if obj.ParentID() != nil { if obj.ParentID() != nil {
result = append(result, namedBucketItem{ result = append(result, namedBucketItem{
name: parentBucketName(addr.ContainerID()), name: parentBucketName(addr.ContainerID()),
@ -176,7 +177,14 @@ func listIndexes(obj *object.Object) ([]namedBucketItem, error) {
}) })
} }
// todo: index splitID // index split ids
if obj.SplitID() != nil {
result = append(result, namedBucketItem{
name: splitBucketName(addr.ContainerID()),
key: obj.SplitID().ToV2(),
val: objKey,
})
}
return result, nil return result, nil
} }

View file

@ -160,6 +160,9 @@ func (db *DB) selectFastFilter(
case v2object.FilterHeaderParent: case v2object.FilterHeaderParent:
bucketName := parentBucketName(cid) bucketName := parentBucketName(cid)
db.selectFromList(tx, bucketName, f, prefix, to, fNum) db.selectFromList(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterHeaderSplitID:
bucketName := splitBucketName(cid)
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterPropertyRoot: case v2object.FilterPropertyRoot:
selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum) selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum)
case v2object.FilterPropertyPhy: case v2object.FilterPropertyPhy:

View file

@ -358,3 +358,52 @@ func TestDB_SelectObjectID(t *testing.T) {
testSelect(t, db, fs, parent.Object().Address()) testSelect(t, db, fs, parent.Object().Address())
}) })
} }
func TestDB_SelectSplitID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := testCID()
child1 := generateRawObjectWithCID(t, cid)
child2 := generateRawObjectWithCID(t, cid)
child3 := generateRawObjectWithCID(t, cid)
split1 := objectSDK.NewSplitID()
split2 := objectSDK.NewSplitID()
child1.SetSplitID(split1)
child2.SetSplitID(split1)
child3.SetSplitID(split2)
require.NoError(t, db.Put(child1.Object(), nil))
require.NoError(t, db.Put(child2.Object(), nil))
require.NoError(t, db.Put(child3.Object(), nil))
t.Run("split id", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual)
testSelect(t, db, fs,
child1.Object().Address(),
child2.Object().Address(),
)
fs = generateSearchFilter(cid)
fs.AddFilter(v2object.FilterHeaderSplitID, split2.String(), objectSDK.MatchStringEqual)
testSelect(t, db, fs, child3.Object().Address())
})
t.Run("empty split", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual)
testSelect(t, db, fs)
})
t.Run("unknown split id", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs.AddFilter(v2object.FilterHeaderSplitID,
objectSDK.NewSplitID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, fs)
})
}

View file

@ -27,6 +27,7 @@ var (
payloadHashPostfix = "_payloadhash" payloadHashPostfix = "_payloadhash"
rootPostfix = "_root" rootPostfix = "_root"
parentPostfix = "_parent" parentPostfix = "_parent"
splitPostfix = "_splitid"
userAttributePostfix = "_attr_" userAttributePostfix = "_attr_"
) )
@ -76,11 +77,16 @@ func ownerBucketName(cid *container.ID) []byte {
return []byte(cid.String() + ownerPostfix) return []byte(cid.String() + ownerPostfix)
} }
// parentBucketNAme returns <CID>_parent. // parentBucketName returns <CID>_parent.
func parentBucketName(cid *container.ID) []byte { func parentBucketName(cid *container.ID) []byte {
return []byte(cid.String() + parentPostfix) return []byte(cid.String() + parentPostfix)
} }
// splitBucketName returns <CID>_splitid.
func splitBucketName(cid *container.ID) []byte {
return []byte(cid.String() + splitPostfix)
}
// addressKey returns key for K-V tables when key is a whole address. // addressKey returns key for K-V tables when key is a whole address.
func addressKey(addr *object.Address) []byte { func addressKey(addr *object.Address) []byte {
return []byte(addr.String()) return []byte(addr.String())