diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index b8df8763d..6a2f3a638 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -22,8 +22,8 @@ func putBig(db *meta.DB, obj *object.Object) error { return meta.Put(db, obj, nil) } -func testSelect(t *testing.T, db *meta.DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) { - res, err := meta.Select(db, fs) +func testSelect(t *testing.T, db *meta.DB, cid *container.ID, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) { + res, err := meta.Select(db, cid, fs) require.NoError(t, err) require.Len(t, res, len(exp)) diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 3f2afedce..eaf13f62e 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -26,6 +26,7 @@ type ( // SelectPrm groups the parameters of Select operation. type SelectPrm struct { + cid *container.ID filters object.SearchFilters } @@ -34,6 +35,15 @@ type SelectRes struct { addrList []*object.Address } +// WithContainerID is a Select option to set the container id to search in. +func (p *SelectPrm) WithContainerID(cid *container.ID) *SelectPrm { + if p != nil { + p.cid = cid + } + + return p +} + // WithFilters is a Select option to set the object filters. func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm { if p != nil { @@ -48,11 +58,11 @@ func (r *SelectRes) AddressList() []*object.Address { return r.addrList } -var ErrContainerNotInQuery = errors.New("search query does not contain container id filter") +var ErrMissingContainerID = errors.New("missing container id field") // Select selects the objects from DB with filtering. -func Select(db *DB, fs object.SearchFilters) ([]*object.Address, error) { - r, err := db.Select(new(SelectPrm).WithFilters(fs)) +func Select(db *DB, cid *container.ID, fs object.SearchFilters) ([]*object.Address, error) { + r, err := db.Select(new(SelectPrm).WithFilters(fs).WithContainerID(cid)) if err != nil { return nil, err } @@ -65,7 +75,7 @@ func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) { res = new(SelectRes) err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.addrList, err = db.selectObjects(tx, prm.filters) + res.addrList, err = db.selectObjects(tx, prm.cid, prm.filters) return err }) @@ -73,14 +83,20 @@ func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) { return res, err } -func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) { +func (db *DB) selectObjects(tx *bbolt.Tx, cid *container.ID, fs object.SearchFilters) ([]*object.Address, error) { + if cid == nil { + return nil, ErrMissingContainerID + } + group, err := groupFilters(fs) if err != nil { return nil, err } - if group.cid == nil { - return nil, ErrContainerNotInQuery + // if there are conflicts in query and cid then it means that there is no + // objects to match this query. + if group.cid != nil && !cid.Equal(group.cid) { + return nil, nil } // keep matched addresses in this cache @@ -92,10 +108,10 @@ func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Ad if len(group.fastFilters) == 0 { expLen = 1 - db.selectAll(tx, group.cid, mAddr) + db.selectAll(tx, cid, mAddr) } else { for i := range group.fastFilters { - db.selectFastFilter(tx, group.cid, group.fastFilters[i], mAddr, i) + db.selectFastFilter(tx, cid, group.fastFilters[i], mAddr, i) } } @@ -383,7 +399,7 @@ func groupFilters(filters object.SearchFilters) (*filterGroup, error) { for i := range filters { switch filters[i].Header() { - case v2object.FilterHeaderContainerID: + case v2object.FilterHeaderContainerID: // support deprecated field res.cid = container.NewID() err := res.cid.Parse(filters[i].Value()) diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 30effff9f..5e3e5c1b0 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/nspcc-dev/neofs-api-go/pkg" - "github.com/nspcc-dev/neofs-api-go/pkg/container" objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" v2object "github.com/nspcc-dev/neofs-api-go/v2/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" @@ -38,27 +37,27 @@ func TestDB_SelectUserAttributes(t *testing.T) { err = putBig(db, raw3.Object()) require.NoError(t, err) - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual) - testSelect(t, db, fs, + testSelect(t, db, cid, fs, raw1.Object().Address(), raw2.Object().Address(), ) - fs = generateSearchFilter(cid) + fs = objectSDK.SearchFilters{} fs.AddFilter("x", "y", objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw1.Object().Address()) + testSelect(t, db, cid, fs, raw1.Object().Address()) - fs = generateSearchFilter(cid) + fs = objectSDK.SearchFilters{} fs.AddFilter("a", "b", objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw3.Object().Address()) + testSelect(t, db, cid, fs, raw3.Object().Address()) - fs = generateSearchFilter(cid) + fs = objectSDK.SearchFilters{} fs.AddFilter("c", "d", objectSDK.MatchStringEqual) - testSelect(t, db, fs) + testSelect(t, db, cid, fs) - fs = generateSearchFilter(cid) - testSelect(t, db, fs, + fs = objectSDK.SearchFilters{} + testSelect(t, db, cid, fs, raw1.Object().Address(), raw2.Object().Address(), raw3.Object().Address(), @@ -109,18 +108,18 @@ func TestDB_SelectRootPhyParent(t *testing.T) { require.NoError(t, err) t.Run("root objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddRootFilter() - testSelect(t, db, fs, + testSelect(t, db, cid, fs, small.Object().Address(), parent.Object().Address(), ) }) t.Run("phy objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddPhyFilter() - testSelect(t, db, fs, + testSelect(t, db, cid, fs, small.Object().Address(), ts.Object().Address(), sg.Object().Address(), @@ -131,9 +130,9 @@ func TestDB_SelectRootPhyParent(t *testing.T) { }) t.Run("regular objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderObjectType, "Regular", objectSDK.MatchStringEqual) - testSelect(t, db, fs, + testSelect(t, db, cid, fs, small.Object().Address(), leftChild.Object().Address(), rightChild.Object().Address(), @@ -143,32 +142,32 @@ func TestDB_SelectRootPhyParent(t *testing.T) { }) t.Run("tombstone objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderObjectType, "Tombstone", objectSDK.MatchStringEqual) - testSelect(t, db, fs, ts.Object().Address()) + testSelect(t, db, cid, fs, ts.Object().Address()) }) t.Run("storage group objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderObjectType, "StorageGroup", objectSDK.MatchStringEqual) - testSelect(t, db, fs, sg.Object().Address()) + testSelect(t, db, cid, fs, sg.Object().Address()) }) t.Run("objects with parent", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderParent, parent.ID().String(), objectSDK.MatchStringEqual) - testSelect(t, db, fs, + testSelect(t, db, cid, fs, rightChild.Object().Address(), link.Object().Address(), ) }) t.Run("all objects", func(t *testing.T) { - fs := generateSearchFilter(cid) - testSelect(t, db, fs, + fs := objectSDK.SearchFilters{} + testSelect(t, db, cid, fs, small.Object().Address(), ts.Object().Address(), sg.Object().Address(), @@ -194,8 +193,8 @@ func TestDB_SelectInhume(t *testing.T) { err = putBig(db, raw2.Object()) require.NoError(t, err) - fs := generateSearchFilter(cid) - testSelect(t, db, fs, + fs := objectSDK.SearchFilters{} + testSelect(t, db, cid, fs, raw1.Object().Address(), raw2.Object().Address(), ) @@ -207,8 +206,8 @@ func TestDB_SelectInhume(t *testing.T) { err = meta.Inhume(db, raw2.Object().Address(), tombstone) require.NoError(t, err) - fs = generateSearchFilter(cid) - testSelect(t, db, fs, + fs = objectSDK.SearchFilters{} + testSelect(t, db, cid, fs, raw1.Object().Address(), ) } @@ -227,12 +226,12 @@ func TestDB_SelectPayloadHash(t *testing.T) { err = putBig(db, raw2.Object()) require.NoError(t, err) - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderPayloadHash, hex.EncodeToString(raw1.PayloadChecksum().Sum()), objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw1.Object().Address()) + testSelect(t, db, cid, fs, raw1.Object().Address()) } func TestDB_SelectWithSlowFilters(t *testing.T) { @@ -263,42 +262,35 @@ func TestDB_SelectWithSlowFilters(t *testing.T) { require.NoError(t, err) t.Run("object with TZHash", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderHomomorphicHash, hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()), objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw1.Object().Address()) + testSelect(t, db, cid, fs, raw1.Object().Address()) }) t.Run("object with payload length", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw2.Object().Address()) + testSelect(t, db, cid, fs, raw2.Object().Address()) }) t.Run("object with creation epoch", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual) - testSelect(t, db, fs, raw1.Object().Address()) + testSelect(t, db, cid, fs, raw1.Object().Address()) }) t.Run("object with version", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21) - testSelect(t, db, fs, raw2.Object().Address()) + testSelect(t, db, cid, fs, raw2.Object().Address()) }) } -func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters { - fs := objectSDK.SearchFilters{} - fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid) - - return fs -} - func TestDB_SelectObjectID(t *testing.T) { db := newDB(t) defer releaseDB(db) @@ -329,34 +321,34 @@ func TestDB_SelectObjectID(t *testing.T) { t.Run("not found objects", func(t *testing.T) { raw := generateRawObjectWithCID(t, cid) - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID()) - testSelect(t, db, fs) + testSelect(t, db, cid, fs) }) t.Run("regular objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID()) - testSelect(t, db, fs, regular.Object().Address()) + testSelect(t, db, cid, fs, regular.Object().Address()) }) t.Run("tombstone objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID()) - testSelect(t, db, fs, ts.Object().Address()) + testSelect(t, db, cid, fs, ts.Object().Address()) }) t.Run("storage group objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID()) - testSelect(t, db, fs, sg.Object().Address()) + testSelect(t, db, cid, fs, sg.Object().Address()) }) t.Run("storage group objects", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID()) - testSelect(t, db, fs, parent.Object().Address()) + testSelect(t, db, cid, fs, parent.Object().Address()) }) } @@ -382,29 +374,63 @@ func TestDB_SelectSplitID(t *testing.T) { require.NoError(t, putBig(db, child3.Object())) t.Run("split id", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual) - testSelect(t, db, fs, + testSelect(t, db, cid, fs, child1.Object().Address(), child2.Object().Address(), ) - fs = generateSearchFilter(cid) + fs = objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderSplitID, split2.String(), objectSDK.MatchStringEqual) - testSelect(t, db, fs, child3.Object().Address()) + testSelect(t, db, cid, fs, child3.Object().Address()) }) t.Run("empty split", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual) - testSelect(t, db, fs) + testSelect(t, db, cid, fs) }) t.Run("unknown split id", func(t *testing.T) { - fs := generateSearchFilter(cid) + fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderSplitID, objectSDK.NewSplitID().String(), objectSDK.MatchStringEqual) - testSelect(t, db, fs) + testSelect(t, db, cid, fs) + }) +} + +func TestDB_SelectContainerID(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + cid := testCID() + + obj1 := generateRawObjectWithCID(t, cid) + err := putBig(db, obj1.Object()) + require.NoError(t, err) + + obj2 := generateRawObjectWithCID(t, cid) + err = putBig(db, obj2.Object()) + require.NoError(t, err) + + t.Run("same cid", func(t *testing.T) { + fs := objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid) + + testSelect(t, db, cid, fs, + obj1.Object().Address(), + obj2.Object().Address(), + ) + }) + + t.Run("not same cid", func(t *testing.T) { + newCID := testCID() + + fs := objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, newCID) + + testSelect(t, db, cid, fs) }) }