diff --git a/pkg/local_object_storage/metabase/v2/db.go b/pkg/local_object_storage/metabase/v2/db.go index faa3a615e..b6589c47d 100644 --- a/pkg/local_object_storage/metabase/v2/db.go +++ b/pkg/local_object_storage/metabase/v2/db.go @@ -86,6 +86,15 @@ func bucketKeyHelper(hdr string, val string) []byte { } return v + case v2object.FilterHeaderSplitID: + s := object.NewSplitID() + + err := s.Parse(val) + if err != nil { + return nil + } + + return s.ToV2() default: return []byte(val) } diff --git a/pkg/local_object_storage/metabase/v2/put.go b/pkg/local_object_storage/metabase/v2/put.go index 13ac704bf..79911b568 100644 --- a/pkg/local_object_storage/metabase/v2/put.go +++ b/pkg/local_object_storage/metabase/v2/put.go @@ -105,7 +105,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *object.Object, id *blobovnicza.ID, isParent func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]namedBucketItem, error) { addr := obj.Address() objKey := objectKey(addr.ObjectID()) - result := make([]namedBucketItem, 0, 2) + result := make([]namedBucketItem, 0, 3) // add value to primary unique bucket if !isParent { @@ -157,7 +157,7 @@ func uniqueIndexes(obj *object.Object, isParent bool, id *blobovnicza.ID) ([]nam // builds list of indexes from the object. func listIndexes(obj *object.Object) ([]namedBucketItem, error) { - result := make([]namedBucketItem, 0, 1) + result := make([]namedBucketItem, 0, 3) addr := obj.Address() objKey := objectKey(addr.ObjectID()) @@ -168,6 +168,7 @@ func listIndexes(obj *object.Object) ([]namedBucketItem, error) { val: objKey, }) + // index parent ids if obj.ParentID() != nil { result = append(result, namedBucketItem{ name: parentBucketName(addr.ContainerID()), @@ -176,7 +177,14 @@ func listIndexes(obj *object.Object) ([]namedBucketItem, error) { }) } - // todo: index splitID + // index split ids + if obj.SplitID() != nil { + result = append(result, namedBucketItem{ + name: splitBucketName(addr.ContainerID()), + key: obj.SplitID().ToV2(), + val: objKey, + }) + } return result, nil } diff --git a/pkg/local_object_storage/metabase/v2/select.go b/pkg/local_object_storage/metabase/v2/select.go index 8d628b99f..7de17ba3d 100644 --- a/pkg/local_object_storage/metabase/v2/select.go +++ b/pkg/local_object_storage/metabase/v2/select.go @@ -160,6 +160,9 @@ func (db *DB) selectFastFilter( case v2object.FilterHeaderParent: bucketName := parentBucketName(cid) db.selectFromList(tx, bucketName, f, prefix, to, fNum) + case v2object.FilterHeaderSplitID: + bucketName := splitBucketName(cid) + db.selectFromList(tx, bucketName, f, prefix, to, fNum) case v2object.FilterPropertyRoot: selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum) case v2object.FilterPropertyPhy: diff --git a/pkg/local_object_storage/metabase/v2/select_test.go b/pkg/local_object_storage/metabase/v2/select_test.go index b84d6562b..3abbafc74 100644 --- a/pkg/local_object_storage/metabase/v2/select_test.go +++ b/pkg/local_object_storage/metabase/v2/select_test.go @@ -358,3 +358,52 @@ func TestDB_SelectObjectID(t *testing.T) { testSelect(t, db, fs, parent.Object().Address()) }) } + +func TestDB_SelectSplitID(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + child1 := generateRawObjectWithCID(t, cid) + child2 := generateRawObjectWithCID(t, cid) + child3 := generateRawObjectWithCID(t, cid) + + split1 := objectSDK.NewSplitID() + split2 := objectSDK.NewSplitID() + + child1.SetSplitID(split1) + child2.SetSplitID(split1) + child3.SetSplitID(split2) + + require.NoError(t, db.Put(child1.Object(), nil)) + require.NoError(t, db.Put(child2.Object(), nil)) + require.NoError(t, db.Put(child3.Object(), nil)) + + t.Run("split id", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual) + testSelect(t, db, fs, + child1.Object().Address(), + child2.Object().Address(), + ) + + fs = generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderSplitID, split2.String(), objectSDK.MatchStringEqual) + testSelect(t, db, fs, child3.Object().Address()) + }) + + t.Run("empty split", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual) + testSelect(t, db, fs) + }) + + t.Run("unknown split id", func(t *testing.T) { + fs := generateSearchFilter(cid) + fs.AddFilter(v2object.FilterHeaderSplitID, + objectSDK.NewSplitID().String(), + objectSDK.MatchStringEqual) + testSelect(t, db, fs) + }) +} diff --git a/pkg/local_object_storage/metabase/v2/util.go b/pkg/local_object_storage/metabase/v2/util.go index 571696007..f24929e01 100644 --- a/pkg/local_object_storage/metabase/v2/util.go +++ b/pkg/local_object_storage/metabase/v2/util.go @@ -27,6 +27,7 @@ var ( payloadHashPostfix = "_payloadhash" rootPostfix = "_root" parentPostfix = "_parent" + splitPostfix = "_splitid" userAttributePostfix = "_attr_" ) @@ -76,11 +77,16 @@ func ownerBucketName(cid *container.ID) []byte { return []byte(cid.String() + ownerPostfix) } -// parentBucketNAme returns _parent. +// parentBucketName returns _parent. func parentBucketName(cid *container.ID) []byte { return []byte(cid.String() + parentPostfix) } +// splitBucketName returns _splitid. +func splitBucketName(cid *container.ID) []byte { + return []byte(cid.String() + splitPostfix) +} + // addressKey returns key for K-V tables when key is a whole address. func addressKey(addr *object.Address) []byte { return []byte(addr.String())