From ffbf6b922f081e3ee3456dc315218bc209e948b8 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 2 Feb 2021 00:00:40 +0300 Subject: [PATCH] [#361] metabase: Support new match types Support STRING_NOT_EQUAL and NOT_PRESENT match types. Signed-off-by: Leonard Lyubich --- go.mod | 2 +- go.sum | Bin 59137 -> 59137 bytes pkg/local_object_storage/metabase/db.go | 21 +- pkg/local_object_storage/metabase/select.go | 266 ++++++++++++++---- .../metabase/select_test.go | 204 +++++++++++++- 5 files changed, 430 insertions(+), 63 deletions(-) diff --git a/go.mod b/go.mod index 0eeb93a0..b385b76e 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/multiformats/go-multihash v0.0.13 // indirect github.com/nspcc-dev/hrw v1.0.9 github.com/nspcc-dev/neo-go v0.92.0 - github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210127171042-f654094edb2e + github.com/nspcc-dev/neofs-api-go v1.22.3-0.20210202125050-01c7e17b081d github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/nspcc-dev/tzhash v1.4.0 github.com/panjf2000/ants/v2 v2.3.0 diff --git a/go.sum b/go.sum index 01e4f9f63d899a89cd909cef59b6b1438e2c4523..244dd631f893c0332047b0c50ec30a8b36477e43 100644 GIT binary patch delta 114 zcmZoX$J}_1dBf&x7b62BLnBiIQv+QC!({VRL-Qm93&RwJ3_~l^szUeRfP6nEPsa?W y%B(0?SC^vVykN&PpU`w?KPSJi%J4Av;-sQ1qsf7p5|jCJWW=x=w%IY~KotNF)gyNR delta 115 zcmZoX$J}_1dBf&xS3@InLvup|6C>R;GgA`-OOw=;B%@S?3`48DM9-9bN3-PcVi${? zl7Reh|H8r`f6L%f3$uVgkC5C%LyN4OLh}+2*Zj#Bv!o`^%9a$vZrWzYoC8$=bAcsH diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index c55c70f9..0b219fc5 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -54,23 +54,32 @@ func New(opts ...Option) *DB { return &DB{ cfg: c, matchers: map[object.SearchMatchType]func(string, []byte, string) bool{ - object.MatchUnknown: unknownMatcher, - object.MatchStringEqual: stringEqualMatcher, + object.MatchUnknown: unknownMatcher, + object.MatchStringEqual: stringEqualMatcher, + object.MatchStringNotEqual: stringNotEqualMatcher, }, } } -func stringEqualMatcher(key string, objVal []byte, filterVal string) bool { +func stringifyValue(key string, objVal []byte) string { switch key { default: - return string(objVal) == filterVal + return string(objVal) case v2object.FilterHeaderPayloadHash, v2object.FilterHeaderHomomorphicHash: - return hex.EncodeToString(objVal) == filterVal + return hex.EncodeToString(objVal) case v2object.FilterHeaderCreationEpoch, v2object.FilterHeaderPayloadLength: - return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10) == filterVal + return strconv.FormatUint(binary.LittleEndian.Uint64(objVal), 10) } } +func stringEqualMatcher(key string, objVal []byte, filterVal string) bool { + return stringifyValue(key, objVal) == filterVal +} + +func stringNotEqualMatcher(key string, objVal []byte, filterVal string) bool { + return stringifyValue(key, objVal) != filterVal +} + func unknownMatcher(_ string, _ []byte, _ string) bool { return false } diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index eaf13f62..21d13eba 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -3,6 +3,7 @@ package meta import ( "encoding/binary" "fmt" + "strings" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" @@ -88,6 +89,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cid *container.ID, fs object.SearchFil return nil, ErrMissingContainerID } + // TODO: consider the option of moving this check to a level higher than the metabase + if blindlyProcess(fs) { + return nil, nil + } + group, err := groupFilters(fs) if err != nil { return nil, err @@ -181,7 +187,7 @@ func (db *DB) selectFastFilter( switch f.Header() { case v2object.FilterHeaderObjectID: - db.selectObjectID(tx, f, prefix, to, fNum) + db.selectObjectID(tx, f, cid, to, fNum) case v2object.FilterHeaderOwnerID: bucketName := ownerBucketName(cid) db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) @@ -189,26 +195,9 @@ func (db *DB) selectFastFilter( bucketName := payloadHashBucketName(cid) db.selectFromList(tx, bucketName, f, prefix, to, fNum) case v2object.FilterHeaderObjectType: - var bucketName []byte - - switch f.Value() { // do it better after https://github.com/nspcc-dev/neofs-api/issues/84 - case "Regular": - bucketName = primaryBucketName(cid) - + for _, bucketName := range bucketNamesForType(cid, f.Operation(), f.Value()) { selectAllFromBucket(tx, bucketName, prefix, to, fNum) - - bucketName = parentBucketName(cid) - case "Tombstone": - bucketName = tombstoneBucketName(cid) - case "StorageGroup": - bucketName = storageGroupBucketName(cid) - default: - db.log.Debug("unknown object type", zap.String("type", f.Value())) - - return } - - selectAllFromBucket(tx, bucketName, prefix, to, fNum) case v2object.FilterHeaderParent: bucketName := parentBucketName(cid) db.selectFromList(tx, bucketName, f, prefix, to, fNum) @@ -223,10 +212,57 @@ func (db *DB) selectFastFilter( selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum) default: // user attribute bucketName := attributeBucketName(cid, f.Header()) - db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) + + if f.Operation() == object.MatchNotPresent { + selectOutsideFKBT(tx, allBucketNames(cid), bucketName, f, prefix, to, fNum) + } else { + db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) + } } } +// TODO: move to DB struct +var mBucketNaming = map[string][]func(*container.ID) []byte{ + v2object.TypeRegular.String(): {primaryBucketName, parentBucketName}, + v2object.TypeTombstone.String(): {tombstoneBucketName}, + v2object.TypeStorageGroup.String(): {storageGroupBucketName}, +} + +func allBucketNames(cid *container.ID) (names [][]byte) { + for _, fns := range mBucketNaming { + for _, fn := range fns { + names = append(names, fn(cid)) + } + } + + return +} + +func bucketNamesForType(cid *container.ID, mType object.SearchMatchType, typeVal string) (names [][]byte) { + appendNames := func(key string) { + fns, ok := mBucketNaming[key] + if ok { + for _, fn := range fns { + names = append(names, fn(cid)) + } + } + } + + switch mType { + default: + case object.MatchStringNotEqual: + for key := range mBucketNaming { + if key != typeVal { + appendNames(key) + } + } + case object.MatchStringEqual: + appendNames(typeVal) + } + + return +} + // selectFromList looks into index to find list of addresses to add in // resulting cache. func (db *DB) selectFromFKBT( @@ -271,6 +307,54 @@ func (db *DB) selectFromFKBT( } } +// selectOutsideFKBT looks into all incl buckets to find list of addresses outside to add in +// resulting cache. +func selectOutsideFKBT( + tx *bbolt.Tx, + incl [][]byte, // buckets + name []byte, // fkbt root bucket name + f object.SearchFilter, // filter for operation and value + prefix string, // prefix to create addr from oid in index + to map[string]int, // resulting cache + fNum int, // index of filter +) { + mExcl := make(map[string]struct{}) + + bktExcl := tx.Bucket(name) + if bktExcl != nil { + _ = bktExcl.ForEach(func(k, _ []byte) error { + exclBktLeaf := bktExcl.Bucket(k) + if exclBktLeaf == nil { + return nil + } + + return exclBktLeaf.ForEach(func(k, _ []byte) error { + addr := prefix + string(k) + mExcl[addr] = struct{}{} + + return nil + }) + }) + } + + for i := range incl { + bktIncl := tx.Bucket(incl[i]) + if bktIncl == nil { + continue + } + + _ = bktIncl.ForEach(func(k, _ []byte) error { + addr := prefix + string(k) + + if _, ok := mExcl[addr]; !ok { + markAddressInCache(to, fNum, addr) + } + + return nil + }) + } +} + // selectFromList looks into index to find list of addresses to add in // resulting cache. func (db *DB) selectFromList( @@ -286,21 +370,50 @@ func (db *DB) selectFromList( return } - switch f.Operation() { + var ( + lst [][]byte + err error + ) + + switch op := f.Operation(); op { case object.MatchStringEqual: + lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value()))) + if err != nil { + db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error())) + return + } default: - db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation()))) + fMatch, ok := db.matchers[op] + if !ok { + db.log.Debug("unknown operation", zap.Uint32("operation", uint32(op))) - return - } + return + } - // warning: it works only for MatchStringEQ, for NotEQ you should iterate over - // bkt and apply matchFunc, don't forget to implement this when it will be - // needed. Right now it is not efficient to iterate over bucket - // when there is only MatchStringEQ. - lst, err := decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value()))) - if err != nil { - db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error())) + if err = bkt.ForEach(func(key, val []byte) error { + if !fMatch(f.Header(), key, f.Value()) { + return nil + } + + l, err := decodeList(val) + if err != nil { + db.log.Debug("can't decode list bucket leaf", + zap.String("error", err.Error()), + ) + + return err + } + + lst = append(lst, l...) + + return nil + }); err != nil { + db.log.Debug("can't iterate over the bucket", + zap.String("error", err.Error()), + ) + + return + } } for i := range lst { @@ -313,36 +426,65 @@ func (db *DB) selectFromList( func (db *DB) selectObjectID( tx *bbolt.Tx, f object.SearchFilter, - prefix string, + cid *container.ID, to map[string]int, // resulting cache fNum int, // index of filter ) { - switch f.Operation() { + prefix := cid.String() + "/" + + appendOID := func(oid string) { + addrStr := prefix + string(oid) + + addr := object.NewAddress() + + err := addr.Parse(addrStr) + if err != nil { + db.log.Debug("can't decode object id address", + zap.String("addr", addrStr), + zap.String("error", err.Error())) + + return + } + + ok, err := db.exists(tx, addr) + if (err == nil && ok) || errors.As(err, &splitInfoError) { + markAddressInCache(to, fNum, addrStr) + } + } + + switch op := f.Operation(); op { case object.MatchStringEqual: + appendOID(f.Value()) default: - db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation()))) + fMatch, ok := db.matchers[op] + if !ok { + db.log.Debug("unknown operation", + zap.Uint32("operation", uint32(f.Operation())), + ) - return - } + return + } - // warning: it is in-place optimization and works only for MatchStringEQ, - // for NotEQ you should iterate over bkt and apply matchFunc + for _, bucketName := range bucketNamesForType(cid, object.MatchStringNotEqual, "") { + // copy-paste from DB.selectAllFrom + bkt := tx.Bucket(bucketName) + if bkt == nil { + return + } - addrStr := prefix + f.Value() - addr := object.NewAddress() + err := bkt.ForEach(func(k, v []byte) error { + if oid := string(k); fMatch(f.Header(), k, f.Value()) { + appendOID(oid) + } - err := addr.Parse(addrStr) - if err != nil { - db.log.Debug("can't decode object id address", - zap.String("addr", addrStr), - zap.String("error", err.Error())) - - return - } - - ok, err := db.exists(tx, addr) - if (err == nil && ok) || errors.As(err, &splitInfoError) { - markAddressInCache(to, fNum, addrStr) + return nil + }) + if err != nil { + db.log.Debug("could not iterate over the buckets", + zap.String("error", err.Error()), + ) + } + } } } @@ -425,3 +567,23 @@ func markAddressInCache(cache map[string]int, fNum int, addr string) { cache[addr] = num + 1 } } + +// returns true if query leads to a deliberately empty result. +func blindlyProcess(fs object.SearchFilters) bool { + for i := range fs { + if fs[i].Operation() == object.MatchNotPresent && isSystemKey(fs[i].Header()) { + return true + } + + // TODO: check other cases + // e.g. (a == b) && (a != b) + } + + return false +} + +// returns true if string key is a reserved system filter key. +func isSystemKey(key string) bool { + // FIXME: version-dependent approach + return strings.HasPrefix(key, v2object.ReservedFilterPrefix) +} diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 5e3e5c1b..6d955a01 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -48,6 +48,10 @@ func TestDB_SelectUserAttributes(t *testing.T) { fs.AddFilter("x", "y", objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw1.Object().Address()) + fs = objectSDK.SearchFilters{} + fs.AddFilter("x", "y", objectSDK.MatchStringNotEqual) + testSelect(t, db, cid, fs, raw2.Object().Address()) + fs = objectSDK.SearchFilters{} fs.AddFilter("a", "b", objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw3.Object().Address()) @@ -57,6 +61,22 @@ func TestDB_SelectUserAttributes(t *testing.T) { testSelect(t, db, cid, fs) fs = objectSDK.SearchFilters{} + fs.AddFilter("foo", "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs, raw3.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter("a", "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs, raw1.Object().Address(), raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + testSelect(t, db, cid, fs, + raw1.Object().Address(), + raw2.Object().Address(), + raw3.Object().Address(), + ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter("key", "", objectSDK.MatchNotPresent) testSelect(t, db, cid, fs, raw1.Object().Address(), raw2.Object().Address(), @@ -114,6 +134,10 @@ func TestDB_SelectRootPhyParent(t *testing.T) { small.Object().Address(), parent.Object().Address(), ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterPropertyRoot, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("phy objects", func(t *testing.T) { @@ -127,11 +151,15 @@ func TestDB_SelectRootPhyParent(t *testing.T) { rightChild.Object().Address(), link.Object().Address(), ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterPropertyPhy, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("regular objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} - fs.AddFilter(v2object.FilterHeaderObjectType, "Regular", objectSDK.MatchStringEqual) + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeRegular.String(), objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, small.Object().Address(), leftChild.Object().Address(), @@ -139,18 +167,59 @@ func TestDB_SelectRootPhyParent(t *testing.T) { link.Object().Address(), parent.Object().Address(), ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeRegular.String(), objectSDK.MatchStringNotEqual) + testSelect(t, db, cid, fs, + ts.Object().Address(), + sg.Object().Address(), + ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("tombstone objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} - fs.AddFilter(v2object.FilterHeaderObjectType, "Tombstone", objectSDK.MatchStringEqual) + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeTombstone.String(), objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, ts.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeTombstone.String(), objectSDK.MatchStringNotEqual) + testSelect(t, db, cid, fs, + small.Object().Address(), + leftChild.Object().Address(), + rightChild.Object().Address(), + link.Object().Address(), + parent.Object().Address(), + sg.Object().Address(), + ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("storage group objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} - fs.AddFilter(v2object.FilterHeaderObjectType, "StorageGroup", objectSDK.MatchStringEqual) + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeStorageGroup.String(), objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, sg.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, v2object.TypeStorageGroup.String(), objectSDK.MatchStringNotEqual) + testSelect(t, db, cid, fs, + small.Object().Address(), + leftChild.Object().Address(), + rightChild.Object().Address(), + link.Object().Address(), + parent.Object().Address(), + ts.Object().Address(), + ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderObjectType, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("objects with parent", func(t *testing.T) { @@ -163,6 +232,10 @@ func TestDB_SelectRootPhyParent(t *testing.T) { rightChild.Object().Address(), link.Object().Address(), ) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderParent, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) }) t.Run("all objects", func(t *testing.T) { @@ -232,6 +305,20 @@ func TestDB_SelectPayloadHash(t *testing.T) { objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw1.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderPayloadHash, + hex.EncodeToString(raw1.PayloadChecksum().Sum()), + objectSDK.MatchStringNotEqual) + + testSelect(t, db, cid, fs, raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderPayloadHash, + "", + objectSDK.MatchNotPresent) + + testSelect(t, db, cid, fs) } func TestDB_SelectWithSlowFilters(t *testing.T) { @@ -268,6 +355,20 @@ func TestDB_SelectWithSlowFilters(t *testing.T) { objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw1.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderHomomorphicHash, + hex.EncodeToString(raw1.PayloadHomomorphicHash().Sum()), + objectSDK.MatchStringNotEqual) + + testSelect(t, db, cid, fs, raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderHomomorphicHash, + "", + objectSDK.MatchNotPresent) + + testSelect(t, db, cid, fs) }) t.Run("object with payload length", func(t *testing.T) { @@ -275,6 +376,16 @@ func TestDB_SelectWithSlowFilters(t *testing.T) { fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderPayloadLength, "20", objectSDK.MatchStringNotEqual) + + testSelect(t, db, cid, fs, raw1.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderPayloadLength, "", objectSDK.MatchNotPresent) + + testSelect(t, db, cid, fs) }) t.Run("object with creation epoch", func(t *testing.T) { @@ -282,12 +393,30 @@ func TestDB_SelectWithSlowFilters(t *testing.T) { fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringEqual) testSelect(t, db, cid, fs, raw1.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderCreationEpoch, "11", objectSDK.MatchStringNotEqual) + + testSelect(t, db, cid, fs, raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderCreationEpoch, "", objectSDK.MatchNotPresent) + + testSelect(t, db, cid, fs) }) t.Run("object with version", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddObjectVersionFilter(objectSDK.MatchStringEqual, v21) testSelect(t, db, cid, fs, raw2.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectVersionFilter(objectSDK.MatchStringNotEqual, v21) + testSelect(t, db, cid, fs, raw1.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectVersionFilter(objectSDK.MatchNotPresent, nil) + testSelect(t, db, cid, fs) }) } @@ -318,6 +447,12 @@ func TestDB_SelectObjectID(t *testing.T) { err = putBig(db, sg.Object()) require.NoError(t, err) + t.Run("not present", func(t *testing.T) { + fs := objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchNotPresent, nil) + testSelect(t, db, cid, fs) + }) + t.Run("not found objects", func(t *testing.T) { raw := generateRawObjectWithCID(t, cid) @@ -325,30 +460,72 @@ func TestDB_SelectObjectID(t *testing.T) { fs.AddObjectIDFilter(objectSDK.MatchStringEqual, raw.ID()) testSelect(t, db, cid, fs) + + fs = objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, raw.ID()) + + testSelect(t, db, cid, fs, + regular.Object().Address(), + parent.Object().Address(), + sg.Object().Address(), + ts.Object().Address(), + ) }) t.Run("regular objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, regular.ID()) testSelect(t, db, cid, fs, regular.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, regular.ID()) + testSelect(t, db, cid, fs, + parent.Object().Address(), + sg.Object().Address(), + ts.Object().Address(), + ) }) t.Run("tombstone objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, ts.ID()) testSelect(t, db, cid, fs, ts.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, ts.ID()) + testSelect(t, db, cid, fs, + regular.Object().Address(), + parent.Object().Address(), + sg.Object().Address(), + ) }) t.Run("storage group objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, sg.ID()) testSelect(t, db, cid, fs, sg.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, sg.ID()) + testSelect(t, db, cid, fs, + regular.Object().Address(), + parent.Object().Address(), + ts.Object().Address(), + ) }) - t.Run("storage group objects", func(t *testing.T) { + t.Run("parent objects", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringEqual, parent.ID()) testSelect(t, db, cid, fs, parent.Object().Address()) + + fs = objectSDK.SearchFilters{} + fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, parent.ID()) + testSelect(t, db, cid, fs, + regular.Object().Address(), + sg.Object().Address(), + ts.Object().Address(), + ) }) } @@ -373,6 +550,12 @@ func TestDB_SelectSplitID(t *testing.T) { require.NoError(t, putBig(db, child2.Object())) require.NoError(t, putBig(db, child3.Object())) + t.Run("not present", func(t *testing.T) { + fs := objectSDK.SearchFilters{} + fs.AddFilter(v2object.FilterHeaderSplitID, "", objectSDK.MatchNotPresent) + testSelect(t, db, cid, fs) + }) + t.Run("split id", func(t *testing.T) { fs := objectSDK.SearchFilters{} fs.AddFilter(v2object.FilterHeaderSplitID, split1.String(), objectSDK.MatchStringEqual) @@ -423,6 +606,19 @@ func TestDB_SelectContainerID(t *testing.T) { obj1.Object().Address(), obj2.Object().Address(), ) + + fs = objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchStringNotEqual, cid) + + testSelect(t, db, cid, fs, + obj1.Object().Address(), + obj2.Object().Address(), + ) + + fs = objectSDK.SearchFilters{} + fs.AddObjectContainerIDFilter(objectSDK.MatchNotPresent, cid) + + testSelect(t, db, cid, fs) }) t.Run("not same cid", func(t *testing.T) {