[] metabase: Make ContainerID a mandatory parameter for Select

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-12-10 16:17:45 +03:00
parent 0e1f05ff45
commit b97f818323
3 changed files with 118 additions and 76 deletions
pkg/local_object_storage/metabase

View file

@ -22,8 +22,8 @@ func putBig(db *meta.DB, obj *object.Object) error {
return meta.Put(db, obj, nil)
}
func testSelect(t *testing.T, db *meta.DB, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) {
res, err := meta.Select(db, fs)
func testSelect(t *testing.T, db *meta.DB, cid *container.ID, fs objectSDK.SearchFilters, exp ...*objectSDK.Address) {
res, err := meta.Select(db, cid, fs)
require.NoError(t, err)
require.Len(t, res, len(exp))

View file

@ -26,6 +26,7 @@ type (
// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
cid *container.ID
filters object.SearchFilters
}
@ -34,6 +35,15 @@ type SelectRes struct {
addrList []*object.Address
}
// WithContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) WithContainerID(cid *container.ID) *SelectPrm {
if p != nil {
p.cid = cid
}
return p
}
// WithFilters is a Select option to set the object filters.
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
if p != nil {
@ -48,11 +58,11 @@ func (r *SelectRes) AddressList() []*object.Address {
return r.addrList
}
var ErrContainerNotInQuery = errors.New("search query does not contain container id filter")
var ErrMissingContainerID = errors.New("missing container id field")
// Select selects the objects from DB with filtering.
func Select(db *DB, fs object.SearchFilters) ([]*object.Address, error) {
r, err := db.Select(new(SelectPrm).WithFilters(fs))
func Select(db *DB, cid *container.ID, fs object.SearchFilters) ([]*object.Address, error) {
r, err := db.Select(new(SelectPrm).WithFilters(fs).WithContainerID(cid))
if err != nil {
return nil, err
}
@ -65,7 +75,7 @@ func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) {
res = new(SelectRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, err = db.selectObjects(tx, prm.filters)
res.addrList, err = db.selectObjects(tx, prm.cid, prm.filters)
return err
})
@ -73,14 +83,20 @@ func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) {
return res, err
}
func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Address, error) {
func (db *DB) selectObjects(tx *bbolt.Tx, cid *container.ID, fs object.SearchFilters) ([]*object.Address, error) {
if cid == nil {
return nil, ErrMissingContainerID
}
group, err := groupFilters(fs)
if err != nil {
return nil, err
}
if group.cid == nil {
return nil, ErrContainerNotInQuery
// if there are conflicts in query and cid then it means that there is no
// objects to match this query.
if group.cid != nil && !cid.Equal(group.cid) {
return nil, nil
}
// keep matched addresses in this cache
@ -92,10 +108,10 @@ func (db *DB) selectObjects(tx *bbolt.Tx, fs object.SearchFilters) ([]*object.Ad
if len(group.fastFilters) == 0 {
expLen = 1
db.selectAll(tx, group.cid, mAddr)
db.selectAll(tx, cid, mAddr)
} else {
for i := range group.fastFilters {
db.selectFastFilter(tx, group.cid, group.fastFilters[i], mAddr, i)
db.selectFastFilter(tx, cid, group.fastFilters[i], mAddr, i)
}
}
@ -383,7 +399,7 @@ func groupFilters(filters object.SearchFilters) (*filterGroup, error) {
for i := range filters {
switch filters[i].Header() {
case v2object.FilterHeaderContainerID:
case v2object.FilterHeaderContainerID: // support deprecated field
res.cid = container.NewID()
err := res.cid.Parse(filters[i].Value())

View file

@ -5,7 +5,6 @@ import (
"testing"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
@ -38,27 +37,27 @@ func TestDB_SelectUserAttributes(t *testing.T) {
err = putBig(db, raw3.Object())
require.NoError(t, err)
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter("foo", "bar", objectSDK.MatchStringEqual)
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
)
fs = generateSearchFilter(cid)
fs = objectSDK.SearchFilters{}
fs.AddFilter("x", "y", objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw1.Object().Address())
testSelect(t, db, cid, fs, raw1.Object().Address())
fs = generateSearchFilter(cid)
fs = objectSDK.SearchFilters{}
fs.AddFilter("a", "b", objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw3.Object().Address())
testSelect(t, db, cid, fs, raw3.Object().Address())
fs = generateSearchFilter(cid)
fs = objectSDK.SearchFilters{}
fs.AddFilter("c", "d", objectSDK.MatchStringEqual)
testSelect(t, db, fs)
testSelect(t, db, cid, fs)
fs = generateSearchFilter(cid)
testSelect(t, db, fs,
fs = objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
raw3.Object().Address(),
@ -109,18 +108,18 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
require.NoError(t, err)
t.Run("root objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddRootFilter()
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
small.Object().Address(),
parent.Object().Address(),
)
})
t.Run("phy objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddPhyFilter()
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
small.Object().Address(),
ts.Object().Address(),
sg.Object().Address(),
@ -131,9 +130,9 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
})
t.Run("regular objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "Regular", objectSDK.MatchStringEqual)
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
small.Object().Address(),
leftChild.Object().Address(),
rightChild.Object().Address(),
@ -143,32 +142,32 @@ func TestDB_SelectRootPhyParent(t *testing.T) {
})
t.Run("tombstone objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "Tombstone", objectSDK.MatchStringEqual)
testSelect(t, db, fs, ts.Object().Address())
testSelect(t, db, cid, fs, ts.Object().Address())
})
t.Run("storage group objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderObjectType, "StorageGroup", objectSDK.MatchStringEqual)
testSelect(t, db, fs, sg.Object().Address())
testSelect(t, db, cid, fs, sg.Object().Address())
})
t.Run("objects with parent", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderParent,
parent.ID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
rightChild.Object().Address(),
link.Object().Address(),
)
})
t.Run("all objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
testSelect(t, db, fs,
fs := objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
small.Object().Address(),
ts.Object().Address(),
sg.Object().Address(),
@ -194,8 +193,8 @@ func TestDB_SelectInhume(t *testing.T) {
err = putBig(db, raw2.Object())
require.NoError(t, err)
fs := generateSearchFilter(cid)
testSelect(t, db, fs,
fs := objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
raw2.Object().Address(),
)
@ -207,8 +206,8 @@ func TestDB_SelectInhume(t *testing.T) {
err = meta.Inhume(db, raw2.Object().Address(), tombstone)
require.NoError(t, err)
fs = generateSearchFilter(cid)
testSelect(t, db, fs,
fs = objectSDK.SearchFilters{}
testSelect(t, db, cid, fs,
raw1.Object().Address(),
)
}
@ -227,12 +226,12 @@ func TestDB_SelectPayloadHash(t *testing.T) {
err = putBig(db, raw2.Object())
require.NoError(t, err)
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadHash,
hex.EncodeToString(raw1.PayloadChecksum().Sum()),
objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw1.Object().Address())
testSelect(t, db, cid, fs, raw1.Object().Address())
}
func TestDB_SelectWithSlowFilters(t *testing.T) {
@ -263,42 +262,35 @@ func TestDB_SelectWithSlowFilters(t *testing.T) {
require.NoError(t, err)
t.Run("object with TZHash", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderHomomorphicHash,
hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()),
objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw1.Object().Address())
testSelect(t, db, cid, fs, raw1.Object().Address())
})
t.Run("object with payload length", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw2.Object().Address())
testSelect(t, db, cid, fs, raw2.Object().Address())
})
t.Run("object with creation epoch", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual)
testSelect(t, db, fs, raw1.Object().Address())
testSelect(t, db, cid, fs, raw1.Object().Address())
})
t.Run("object with version", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21)
testSelect(t, db, fs, raw2.Object().Address())
testSelect(t, db, cid, fs, raw2.Object().Address())
})
}
func generateSearchFilter(cid *container.ID) objectSDK.SearchFilters {
fs := objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid)
return fs
}
func TestDB_SelectObjectID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
@ -329,34 +321,34 @@ func TestDB_SelectObjectID(t *testing.T) {
t.Run("not found objects", func(t *testing.T) {
raw := generateRawObjectWithCID(t, cid)
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID())
testSelect(t, db, fs)
testSelect(t, db, cid, fs)
})
t.Run("regular objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID())
testSelect(t, db, fs, regular.Object().Address())
testSelect(t, db, cid, fs, regular.Object().Address())
})
t.Run("tombstone objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID())
testSelect(t, db, fs, ts.Object().Address())
testSelect(t, db, cid, fs, ts.Object().Address())
})
t.Run("storage group objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID())
testSelect(t, db, fs, sg.Object().Address())
testSelect(t, db, cid, fs, sg.Object().Address())
})
t.Run("storage group objects", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID())
testSelect(t, db, fs, parent.Object().Address())
testSelect(t, db, cid, fs, parent.Object().Address())
})
}
@ -382,29 +374,63 @@ func TestDB_SelectSplitID(t *testing.T) {
require.NoError(t, putBig(db, child3.Object()))
t.Run("split id", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual)
testSelect(t, db, fs,
testSelect(t, db, cid, fs,
child1.Object().Address(),
child2.Object().Address(),
)
fs = generateSearchFilter(cid)
fs = objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, split2.String(), objectSDK.MatchStringEqual)
testSelect(t, db, fs, child3.Object().Address())
testSelect(t, db, cid, fs, child3.Object().Address())
})
t.Run("empty split", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchStringEqual)
testSelect(t, db, fs)
testSelect(t, db, cid, fs)
})
t.Run("unknown split id", func(t *testing.T) {
fs := generateSearchFilter(cid)
fs := objectSDK.SearchFilters{}
fs.AddFilter(v2object.FilterHeaderSplitID,
objectSDK.NewSplitID().String(),
objectSDK.MatchStringEqual)
testSelect(t, db, fs)
testSelect(t, db, cid, fs)
})
}
func TestDB_SelectContainerID(t *testing.T) {
db := newDB(t)
defer releaseDB(db)
cid := testCID()
obj1 := generateRawObjectWithCID(t, cid)
err := putBig(db, obj1.Object())
require.NoError(t, err)
obj2 := generateRawObjectWithCID(t, cid)
err = putBig(db, obj2.Object())
require.NoError(t, err)
t.Run("same cid", func(t *testing.T) {
fs := objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, cid)
testSelect(t, db, cid, fs,
obj1.Object().Address(),
obj2.Object().Address(),
)
})
t.Run("not same cid", func(t *testing.T) {
newCID := testCID()
fs := objectSDK.SearchFilters{}
fs.AddObjectContainerIDFilter(objectSDK.MatchStringEqual, newCID)
testSelect(t, db, cid, fs)
})
}