From c73bf08b72a875d8df4a0a7768db3e3935cb289f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 2 Jul 2024 15:10:17 +0300 Subject: [PATCH] [#9999] metabase: Fix db engine to pebble in select.go Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 6 + pkg/local_object_storage/metabase/db.go | 5 - pkg/local_object_storage/metabase/select.go | 838 +++++++++++++++----- 3 files changed, 636 insertions(+), 213 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 67f173f29..8085768e7 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -539,4 +539,10 @@ const ( PolicerCouldNotGetChunk = "could not get EC chunk" PolicerCouldNotGetChunks = "could not get EC chunks" AuditEventLogRecord = "audit event log record" + MetabaseCouldNotIterateOverThePrefix = "could not iterate over the prefix" + FailedToParseAddressFromKey = "failed to parse address from key" + FailedToParseOwnerFromKey = "failed to parse owner from key" + FailedToParsePayloadHashFromKey = "failed to parse payload hash from key" + FailedToParseSplitIDFromKey = "failed to parse splitID from key" + FailedToParseAttributeValueFromKey = "failed to parse attribute value from key" ) diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 4ed2ba1e8..16a191781 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -20,11 +20,6 @@ import ( "go.uber.org/zap" ) -type matcher struct { - matchSlow func(string, []byte, string) bool - matchBucket func(pebble.Reader, string, string, func([]byte, []byte) error) error -} - // EpochState is an interface that provides access to the // current epoch number. type EpochState interface { diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index ff23350db..bf6853c59 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "context" "encoding/binary" "errors" @@ -15,7 +16,8 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" + "github.com/cockroachdb/pebble" + "github.com/mr-tron/base58" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -91,14 +93,14 @@ func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err err currEpoch := db.epochState.CurrentEpoch() - return res, metaerr.Wrap(db.database.View(func(tx *bbolt.Tx) error { - res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch) + return res, metaerr.Wrap(db.snapshot(func(s *pebble.Snapshot) error { + res.addrList, err = db.selectObjects(ctx, s, prm.cnr, prm.filters, currEpoch) success = err == nil return err })) } -func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) { +func (db *DB) selectObjects(ctx context.Context, r pebble.Reader, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) { group, err := groupFilters(fs) if err != nil { return nil, err @@ -119,10 +121,10 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters if len(group.fastFilters) == 0 { expLen = 1 - db.selectAll(tx, cnr, mAddr) + db.selectAll(ctx, r, cnr, mAddr) } else { for i := range group.fastFilters { - db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i) + db.selectFastFilter(ctx, r, cnr, group.fastFilters[i], mAddr, i, currEpoch) } } @@ -133,21 +135,20 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters continue // ignore objects with unmatched fast filters } - var id oid.ID - err = id.Decode([]byte(a)) - if err != nil { + var addr oid.Address + if err := addr.DecodeString(a); err != nil { return nil, err } - var addr oid.Address - addr.SetContainer(cnr) - addr.SetObject(id) - - if objectStatus(tx, addr, currEpoch) > 0 { + st, err := objectStatus(ctx, r, addr, currEpoch) + if err != nil { + return nil, err + } + if st > 0 { continue // ignore removed objects } - if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) { + if !db.matchSlowFilters(ctx, r, addr, group.slowFilters, currEpoch) { continue // ignore objects with unmatched slow filters } @@ -158,101 +159,137 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters } // selectAll adds to resulting cache all available objects in metabase. -func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) { - bucketName := make([]byte, bucketKeySize) - selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0) - selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0) - selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0) - selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0) +func (db *DB) selectAll(ctx context.Context, r pebble.Reader, cnr cid.ID, to map[string]int) { + db.selectAllWithPrefix(ctx, r, primaryKeyPrefix(cnr), addressFromPrimaryKey, to, 0) + db.selectAllWithPrefix(ctx, r, tombstoneKeyPrefix(cnr), addressFromTombstoneKey, to, 0) + db.selectAllWithPrefix(ctx, r, parentKeyShortPrefix(cnr), addressOfParentFromParentKey, to, 0) + db.selectAllWithPrefix(ctx, r, lockersKeyPrefix(cnr), addressFromLockersKey, to, 0) } // selectAllFromBucket goes through all keys in bucket and adds them in a // resulting cache. Keys should be stringed object ids. -func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) { - bkt := tx.Bucket(name) - if bkt == nil { - return +func (db *DB) selectAllWithPrefix(ctx context.Context, r pebble.Reader, prefix []byte, keyParser func(ket []byte) (oid.Address, error), to map[string]int, fNum int) { + db.selectWithPrefix(ctx, r, prefix, keyParser, func(oid.Address) bool { return true }, to, fNum) +} + +func (db *DB) selectWithPrefix(ctx context.Context, r pebble.Reader, prefix []byte, keyParser func([]byte) (oid.Address, error), condition func(oid.Address) bool, to map[string]int, fNum int) { + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + for _, kv := range kvs { + lastSeen = kv.Key + addr, err := keyParser(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + if condition(addr) { + markAddressInCache(to, fNum, addr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } } - - _ = bkt.ForEach(func(k, _ []byte) error { - markAddressInCache(to, fNum, string(k)) - - return nil - }) } // selectFastFilter makes fast optimized checks for well known buckets or // looking through user attribute buckets otherwise. func (db *DB) selectFastFilter( - tx *bbolt.Tx, + ctx context.Context, + r pebble.Reader, cnr cid.ID, // container we search on f objectSDK.SearchFilter, // fast filter to map[string]int, // resulting cache fNum int, // index of filter + currEpoch uint64, ) { - currEpoch := db.epochState.CurrentEpoch() - bucketName := make([]byte, bucketKeySize) switch f.Header() { case v2object.FilterHeaderObjectID: - db.selectObjectID(tx, f, cnr, to, fNum, currEpoch) + db.selectObjectID(ctx, r, f, cnr, to, fNum, currEpoch) case v2object.FilterHeaderOwnerID: - bucketName := ownerBucketName(cnr, bucketName) - db.selectFromFKBT(tx, bucketName, f, to, fNum) + db.selectOwnerID(ctx, r, f, cnr, to, fNum) case v2object.FilterHeaderPayloadHash: - bucketName := payloadHashBucketName(cnr, bucketName) - db.selectFromList(tx, bucketName, f, to, fNum) + db.selectPayloadHash(ctx, r, f, cnr, to, fNum) case v2object.FilterHeaderObjectType: - for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) { - selectAllFromBucket(tx, bucketName, to, fNum) + for _, prefix := range prefixesForType(cnr, f.Operation(), f.Value()) { + db.selectAllWithPrefix(ctx, r, prefix.prefix, prefix.keyParser, to, fNum) } case v2object.FilterHeaderParent: - bucketName := parentBucketName(cnr, bucketName) - db.selectFromList(tx, bucketName, f, to, fNum) + db.selectParent(ctx, r, f, cnr, to, fNum) case v2object.FilterHeaderSplitID: - bucketName := splitBucketName(cnr, bucketName) - db.selectFromList(tx, bucketName, f, to, fNum) + db.selectSplitID(ctx, r, f, cnr, to, fNum) case v2object.FilterHeaderECParent: - bucketName := ecInfoBucketName(cnr, bucketName) - db.selectFromList(tx, bucketName, f, to, fNum) + db.selectECParent(ctx, r, f, cnr, to, fNum) case v2object.FilterPropertyRoot: - selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum) + db.selectAllWithPrefix(ctx, r, rootKeyPrefix(cnr), addressFromRootKey, to, fNum) case v2object.FilterPropertyPhy: - selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum) - selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum) - selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum) + db.selectAllWithPrefix(ctx, r, primaryKeyPrefix(cnr), addressFromPrimaryKey, to, fNum) + db.selectAllWithPrefix(ctx, r, tombstoneKeyPrefix(cnr), addressFromTombstoneKey, to, fNum) + db.selectAllWithPrefix(ctx, r, lockersKeyPrefix(cnr), addressFromLockersKey, to, fNum) default: // user attribute - bucketName := attributeBucketName(cnr, f.Header(), bucketName) - if f.Operation() == objectSDK.MatchNotPresent { - selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum) + db.selectWithoutAttribute(ctx, r, cnr, f.Header(), to, fNum) } else { - db.selectFromFKBT(tx, bucketName, f, to, fNum) + db.selectByAttribute(ctx, r, cnr, f, to, fNum) } } } -var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{ - v2object.TypeRegular.String(): {primaryBucketName, parentBucketName}, - v2object.TypeTombstone.String(): {tombstoneBucketName}, - v2object.TypeLock.String(): {bucketNameLockers}, +type prefixer struct { + prefixer func(cid.ID) []byte + keyParser func(key []byte) (oid.Address, error) } -func allBucketNames(cnr cid.ID) (names [][]byte) { - for _, fns := range mBucketNaming { - for _, fn := range fns { - names = append(names, fn(cnr, make([]byte, bucketKeySize))) - } - } - - return +type prefixWithKeyParser struct { + prefix []byte + keyParser func(key []byte) (oid.Address, error) } -func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) { +var typeToPrefix = map[string][]prefixer{ + v2object.TypeRegular.String(): { + prefixer{ + prefixer: primaryKeyPrefix, + keyParser: addressFromPrimaryKey, + }, + prefixer{ + prefixer: parentKeyShortPrefix, + keyParser: addressOfParentFromParentKey, + }, + }, + v2object.TypeTombstone.String(): { + prefixer{ + prefixer: tombstoneKeyPrefix, + keyParser: addressFromTombstoneKey, + }, + }, + v2object.TypeLock.String(): { + prefixer{ + prefixer: lockersKeyPrefix, + keyParser: addressFromLockersKey, + }, + }, +} + +func prefixesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (prefixes []prefixWithKeyParser) { appendNames := func(key string) { - fns, ok := mBucketNaming[key] + prefixers, ok := typeToPrefix[key] if ok { - for _, fn := range fns { - names = append(names, fn(cnr, make([]byte, bucketKeySize))) + for _, prefixer := range prefixers { + prefixes = append(prefixes, prefixWithKeyParser{ + prefix: prefixer.prefixer(cnr), + keyParser: prefixer.keyParser, + }) } } } @@ -260,7 +297,7 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str switch mType { default: case objectSDK.MatchStringNotEqual: - for key := range mBucketNaming { + for key := range typeToPrefix { if key != typeVal { appendNames(key) } @@ -268,7 +305,7 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str case objectSDK.MatchStringEqual: appendNames(typeVal) case objectSDK.MatchCommonPrefix: - for key := range mBucketNaming { + for key := range typeToPrefix { if strings.HasPrefix(key, typeVal) { appendNames(key) } @@ -280,145 +317,158 @@ func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal str // selectFromList looks into index to find list of addresses to add in // resulting cache. -func (db *DB) selectFromFKBT( - tx *bbolt.Tx, - name []byte, // fkbt root bucket name +func (db *DB) selectByAttribute( + ctx context.Context, + r pebble.Reader, + cnr cid.ID, f objectSDK.SearchFilter, // filter for operation and value to map[string]int, // resulting cache fNum int, // index of filter ) { // - matchFunc, ok := db.matchers[f.Operation()] - if !ok { + var prefix []byte + var condition func([]byte) bool + switch f.Operation() { + default: db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation()))) - return - } - - fkbtRoot := tx.Bucket(name) - if fkbtRoot == nil { + case objectSDK.MatchUnknown: return - } - - err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error { - fkbtLeaf := fkbtRoot.Bucket(k) - if fkbtLeaf == nil { - return nil + case objectSDK.MatchStringEqual, objectSDK.MatchCommonPrefix: + prefix = attributeKeyPrefix(cnr, f.Header(), f.Value()) + condition = func([]byte) bool { return true } + case objectSDK.MatchStringNotEqual: + prefix = attributeKeyPrefix(cnr, f.Header(), "") + fromRequestValue := []byte(f.Value()) + condition = func(fromDBValue []byte) bool { + return !bytes.Equal(fromDBValue, fromRequestValue) } + } - return fkbtLeaf.ForEach(func(k, _ []byte) error { - markAddressInCache(to, fNum, string(k)) + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + for _, kv := range kvs { + lastSeen = kv.Key + attrValue, err := attributeValueFromAttributeKey(kv.Key, f.Header()) + if err != nil { + db.log.Debug(logs.FailedToParseAttributeValueFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } - return nil - }) - }) - if err != nil { - db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error())) + if condition(attrValue) { + addr, err := addressFromAttributeKey(kv.Key, f.Header()) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, addr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } } } // selectOutsideFKBT looks into all incl buckets to find list of addresses outside to add in // resulting cache. -func selectOutsideFKBT( - tx *bbolt.Tx, - incl [][]byte, // buckets - name []byte, // fkbt root bucket name +func (db *DB) selectWithoutAttribute( + ctx context.Context, + r pebble.Reader, + cnr cid.ID, + attributeKey string, // fkbt root bucket name to map[string]int, // resulting cache fNum int, // index of filter ) { - mExcl := make(map[string]struct{}) - - bktExcl := tx.Bucket(name) - if bktExcl != nil { - _ = bktExcl.ForEachBucket(func(k []byte) error { - exclBktLeaf := bktExcl.Bucket(k) - return exclBktLeaf.ForEach(func(k, _ []byte) error { - mExcl[string(k)] = struct{}{} - - return nil - }) - }) - } - - for i := range incl { - bktIncl := tx.Bucket(incl[i]) - if bktIncl == nil { - continue + for _, prefixers := range typeToPrefix { + for _, prefixer := range prefixers { + db.selectWithoutAttributeForPrexier(ctx, prefixer, cnr, r, attributeKey, to, fNum) } - - _ = bktIncl.ForEach(func(k, _ []byte) error { - if _, ok := mExcl[string(k)]; !ok { - markAddressInCache(to, fNum, string(k)) - } - - return nil - }) } } -// selectFromList looks into index to find list of addresses to add in -// resulting cache. -func (db *DB) selectFromList( - tx *bbolt.Tx, - name []byte, // list root bucket name - f objectSDK.SearchFilter, // filter for operation and value - to map[string]int, // resulting cache - fNum int, // index of filter -) { // - bkt := tx.Bucket(name) - if bkt == nil { - return - } - - var ( - lst [][]byte - err error - ) - - switch op := f.Operation(); op { - case objectSDK.MatchStringEqual: - lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value()))) +func (db *DB) selectWithoutAttributeForPrexier(ctx context.Context, prefixer prefixer, cnr cid.ID, r pebble.Reader, attributeKey string, to map[string]int, fNum int) { + prefix := prefixer.prefixer(cnr) + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) if err != nil { - db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, zap.String("error", err.Error())) + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) return } - default: - fMatch, ok := db.matchers[op] - if !ok { - db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(op))) - - return - } - - if err = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(_, val []byte) error { - l, err := decodeList(val) + for _, kv := range kvs { + lastSeen = kv.Key + addr, err := prefixer.keyParser(kv.Key) if err != nil { - db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, - zap.String("error", err.Error()), + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), ) - - return err + continue } - lst = append(lst, l...) + obj := objectSDK.New() + if err := obj.Unmarshal(kv.Value); err != nil { + db.log.Debug(logs.ShardCouldNotUnmarshalObject, zap.Stringer("address", addr), zap.Error(err)) + continue + } - return nil - }); err != nil { - db.log.Debug(logs.MetabaseCantIterateOverTheBucket, - zap.String("error", err.Error()), - ) - - return + var hasAttribute bool + for _, attr := range obj.Attributes() { + if attr.Key() == attributeKey { + hasAttribute = true + break + } + } + if hasAttribute { + continue + } + markAddressInCache(to, fNum, addr.EncodeToString()) + } + if len(kvs) < batchSize { + break } - } - - for i := range lst { - markAddressInCache(to, fNum, string(lst[i])) } } -// selectObjectID processes objectID filter with in-place optimizations. +type matcher struct { + matchSlow func(string, []byte, string) bool +} + +var matchers map[objectSDK.SearchMatchType]matcher = map[objectSDK.SearchMatchType]matcher{ + objectSDK.MatchUnknown: { + matchSlow: unknownMatcher, + }, + objectSDK.MatchStringEqual: { + matchSlow: stringEqualMatcher, + }, + objectSDK.MatchStringNotEqual: { + matchSlow: stringNotEqualMatcher, + }, + objectSDK.MatchCommonPrefix: { + matchSlow: stringCommonPrefixMatcher, + }, +} + func (db *DB) selectObjectID( - tx *bbolt.Tx, + ctx context.Context, + r pebble.Reader, f objectSDK.SearchFilter, cnr cid.ID, to map[string]int, // resulting cache @@ -431,67 +481,439 @@ func (db *DB) selectObjectID( addr.SetObject(id) var splitInfoError *objectSDK.SplitInfoError - ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch) + ok, _, err := db.exists(ctx, r, addr, oid.Address{}, currEpoch) if (err == nil && ok) || errors.As(err, &splitInfoError) { - raw := make([]byte, objectKeySize) - id.Encode(raw) - markAddressInCache(to, fNum, string(raw)) + markAddressInCache(to, fNum, addr.EncodeToString()) } } + var condition func(oid.Address) bool switch op := f.Operation(); op { case objectSDK.MatchStringEqual: var id oid.ID - if err := id.DecodeString(f.Value()); err == nil { - appendOID(id) + if err := id.DecodeString(f.Value()); err != nil { + return + } + appendOID(id) + return + case objectSDK.MatchUnknown: + return + case objectSDK.MatchStringNotEqual: + var id oid.ID + if err := id.DecodeString(f.Value()); err != nil { + return + } + condition = func(a oid.Address) bool { + return !a.Container().Equals(cnr) || !a.Object().Equals(id) + } + case objectSDK.MatchCommonPrefix: + condition = func(a oid.Address) bool { + return a.Container().Equals(cnr) && strings.HasPrefix( + a.Object().EncodeToString(), + f.Value(), + ) } default: - fMatch, ok := db.matchers[op] - if !ok { - db.log.Debug(logs.MetabaseUnknownOperation, - zap.Uint32("operation", uint32(f.Operation())), - ) + db.log.Debug(logs.MetabaseUnknownOperation, + zap.Uint32("operation", uint32(f.Operation())), + ) + return + } + for _, prefix := range prefixesForType(cnr, objectSDK.MatchStringNotEqual, "") { + db.selectWithPrefix(ctx, r, prefix.prefix, prefix.keyParser, condition, to, fNum) + } +} + +func (db *DB) selectOwnerID(ctx context.Context, r pebble.Reader, f objectSDK.SearchFilter, cnr cid.ID, to map[string]int, fNum int) { + var condition func([]byte) bool + var prefix []byte + switch op := f.Operation(); op { + case objectSDK.MatchCommonPrefix, objectSDK.MatchStringEqual: + prefix = ownerKeyLongPrefix(cnr, []byte(f.Value())) + condition = func([]byte) bool { return true } + case objectSDK.MatchUnknown: + return + case objectSDK.MatchStringNotEqual: + prefix = ownerKeyShortPrefix(cnr) + ownerID := []byte(f.Value()) + condition = func(fromDB []byte) bool { return !bytes.Equal(fromDB, ownerID) } + default: + db.log.Debug(logs.MetabaseUnknownOperation, + zap.Uint32("operation", uint32(f.Operation())), + ) + return + } + + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) return } - for _, bucketName := range bucketNamesForType(cnr, objectSDK.MatchStringNotEqual, "") { - // copy-paste from DB.selectAllFrom - bkt := tx.Bucket(bucketName) - if bkt == nil { - return - } - - err := fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, _ []byte) error { - var id oid.ID - if err := id.Decode(k); err == nil { - appendOID(id) - } - return nil - }) + for _, kv := range kvs { + lastSeen = kv.Key + owner, err := ownerFromOwnerKey(kv.Key) if err != nil { - db.log.Debug(logs.MetabaseCouldNotIterateOverTheBuckets, - zap.String("error", err.Error()), + db.log.Debug(logs.FailedToParseOwnerFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), ) + continue } + if condition(owner) { + addr, err := addressFromOwnerKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, addr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } + } +} + +func (db *DB) selectPayloadHash(ctx context.Context, r pebble.Reader, f objectSDK.SearchFilter, cnr cid.ID, to map[string]int, fNum int) { + var condition func([]byte) bool + var prefix []byte + switch op := f.Operation(); op { + case objectSDK.MatchUnknown: + return + case objectSDK.MatchCommonPrefix: + value, checkLast, ok := destringifyValue(f.Header(), f.Value(), true) + if !ok { + return + } + prefixValue := value + if checkLast { + prefixValue = value[:len(value)-1] + } + if len(value) == 0 { + condition = func([]byte) bool { return true } + prefix = payloadHashKeyShortPrefix(cnr) + } else { + prefix = payloadHashKeyLongPrefix(cnr, prefixValue) + condition = func(fromDB []byte) bool { + if checkLast && (len(fromDB) == len(prefixValue) || fromDB[len(prefixValue)]>>4 != value[len(value)-1]) { + return false + } + return true + } + } + case objectSDK.MatchStringEqual: + value, _, ok := destringifyValue(f.Header(), f.Value(), false) + if !ok { + return + } + prefix = payloadHashKeyLongPrefix(cnr, value) + condition = func([]byte) bool { return true } + case objectSDK.MatchStringNotEqual: + value, _, ok := destringifyValue(f.Header(), f.Value(), false) + prefix = payloadHashKeyShortPrefix(cnr) + condition = func(fromDB []byte) bool { return !ok || !bytes.Equal(fromDB, value) } + default: + db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(f.Operation()))) + return + } + + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + for _, kv := range kvs { + lastSeen = kv.Key + hash, err := payloadHashFromPayloadHashKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParsePayloadHashFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + if condition(hash) { + addr, err := addressFromPayloadHashKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, addr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } + } +} + +func (db *DB) selectParent( + ctx context.Context, + r pebble.Reader, + f objectSDK.SearchFilter, + cnr cid.ID, + to map[string]int, // resulting cache + fNum int, // index of filter +) { + var condition func(oid.ID) bool + var prefix []byte + switch op := f.Operation(); op { + case objectSDK.MatchStringEqual: + var parentObjID oid.ID + if err := parentObjID.DecodeString(f.Value()); err != nil { + return + } + prefix = parentKeyLongPrefix(cnr, parentObjID) + condition = func(oid.ID) bool { return true } + case objectSDK.MatchCommonPrefix: + v, err := base58.Decode(f.Value()) + if err != nil { + return + } + prefix = append(parentKeyShortPrefix(cnr), v...) + condition = func(oid.ID) bool { return true } + case objectSDK.MatchUnknown: + return + case objectSDK.MatchStringNotEqual: + var parentObjID oid.ID + if err := parentObjID.DecodeString(f.Value()); err != nil { + return + } + + prefix = parentKeyShortPrefix(cnr) + condition = func(parentFromDB oid.ID) bool { return !parentFromDB.Equals(parentObjID) } + default: + db.log.Debug(logs.MetabaseUnknownOperation, + zap.Uint32("operation", uint32(f.Operation())), + ) + return + } + + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + + for _, kv := range kvs { + lastSeen = kv.Key + parentAddr, err := addressOfParentFromParentKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + if condition(parentAddr.Object()) { + targetAddr, err := addressOfTargetFromParentKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, targetAddr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } + } +} + +func (db *DB) selectECParent( + ctx context.Context, + r pebble.Reader, + f objectSDK.SearchFilter, + cnr cid.ID, + to map[string]int, // resulting cache + fNum int, // index of filter +) { + var condition func(oid.ID) bool + var prefix []byte + switch op := f.Operation(); op { + case objectSDK.MatchStringEqual: + var ecParentObjID oid.ID + if err := ecParentObjID.DecodeString(f.Value()); err != nil { + return + } + prefix = ecInfoLongKeyPrefix(cnr, ecParentObjID) + condition = func(oid.ID) bool { return true } + case objectSDK.MatchCommonPrefix: + v, err := base58.Decode(f.Value()) + if err != nil { + return + } + prefix = append(ecInfoShortKeyPrefix(cnr), v...) + condition = func(oid.ID) bool { return true } + case objectSDK.MatchUnknown: + return + case objectSDK.MatchStringNotEqual: + var ecParentObjID oid.ID + if err := ecParentObjID.DecodeString(f.Value()); err != nil { + return + } + + prefix = ecInfoShortKeyPrefix(cnr) + condition = func(parentFromDB oid.ID) bool { return !parentFromDB.Equals(ecParentObjID) } + default: + db.log.Debug(logs.MetabaseUnknownOperation, + zap.Uint32("operation", uint32(f.Operation())), + ) + return + } + + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + + for _, kv := range kvs { + lastSeen = kv.Key + ecParentAddr, err := addressOfParentFromECInfoKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + if condition(ecParentAddr.Object()) { + chunkAddr, err := addressOfChunkFromECInfoKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, chunkAddr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break + } + } +} + +func (db *DB) selectSplitID( + ctx context.Context, + r pebble.Reader, + f objectSDK.SearchFilter, + cnr cid.ID, + to map[string]int, // resulting cache + fNum int, // index of filter +) { + var condition func([]byte) bool + var prefix []byte + switch op := f.Operation(); op { + case objectSDK.MatchStringEqual: + s := objectSDK.NewSplitID() + err := s.Parse(f.Value()) + if err != nil { + return + } + prefix = splitKeyLongPrefix(cnr, s.ToV2()) + condition = func([]byte) bool { return true } + case objectSDK.MatchCommonPrefix: + prefix = splitKeyLongPrefix(cnr, []byte(f.Value())) + condition = func([]byte) bool { return true } + case objectSDK.MatchUnknown: + return + case objectSDK.MatchStringNotEqual: + prefix = splitKeyShortPrefix(cnr) + splitIDFromRequest := []byte(f.Value()) + condition = func(splitIDFromDB []byte) bool { return !bytes.Equal(splitIDFromRequest, splitIDFromDB) } + default: + db.log.Debug(logs.MetabaseUnknownOperation, + zap.Uint32("operation", uint32(f.Operation())), + ) + return + } + + var lastSeen []byte + for { + kvs, err := selectByPrefixAndSeek(ctx, r, prefix, lastSeen, batchSize) + if err != nil { + db.log.Debug(logs.MetabaseCouldNotIterateOverThePrefix, + zap.ByteString("prefix", prefix), + zap.Error(err), + ) + return + } + + for _, kv := range kvs { + lastSeen = kv.Key + splitID, err := splitIDFromSplitKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseSplitIDFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + if condition(splitID) { + addr, err := addressFromSplitKey(kv.Key) + if err != nil { + db.log.Debug(logs.FailedToParseAddressFromKey, + zap.ByteString("key", kv.Key), + zap.Error(err), + ) + continue + } + markAddressInCache(to, fNum, addr.EncodeToString()) + } + } + if len(kvs) < batchSize { + break } } } // matchSlowFilters return true if object header is matched by all slow filters. -func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool { +func (db *DB) matchSlowFilters(ctx context.Context, r pebble.Reader, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool { if len(f) == 0 { return true } - buf := make([]byte, addressKeySize) - obj, err := db.get(tx, addr, buf, true, false, currEpoch) + obj, err := get(ctx, r, addr, true, false, currEpoch) if err != nil { return false } for i := range f { - matchFunc, ok := db.matchers[f[i].Operation()] + matchFunc, ok := matchers[f[i].Operation()] if !ok { return false }