package meta import ( "context" "encoding/binary" "errors" "fmt" "strings" "time" v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) type ( // filterGroup is a structure that have search filters grouped by access // method. We have fast filters that looks for indexes and do not unmarshal // objects, and slow filters, that applied after fast filters created // smaller set of objects to check. filterGroup struct { withCnrFilter bool cnr cid.ID fastFilters, slowFilters objectSDK.SearchFilters } ) // SelectPrm groups the parameters of Select operation. type SelectPrm struct { cnr cid.ID filters objectSDK.SearchFilters } // SelectRes groups the resulting values of Select operation. type SelectRes struct { addrList []oid.Address } // SetContainerID is a Select option to set the container id to search in. func (p *SelectPrm) SetContainerID(cnr cid.ID) { p.cnr = cnr } // SetFilters is a Select option to set the object filters. func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) { p.filters = fs } // AddressList returns list of addresses of the selected objects. func (r SelectRes) AddressList() []oid.Address { return r.addrList } // Select returns list of addresses of objects that match search filters. func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) { var ( startedAt = time.Now() success = false ) defer func() { db.metrics.AddMethodDuration("Select", time.Since(startedAt), success) }() _, span := tracing.StartSpanFromContext(ctx, "metabase.Select", trace.WithAttributes( attribute.String("container_id", prm.cnr.EncodeToString()), )) defer span.End() db.modeMtx.RLock() defer db.modeMtx.RUnlock() if db.mode.NoMetabase() { return res, ErrDegradedMode } if checkNonEmpty(prm.filters) { success = true return res, nil } currEpoch := db.epochState.CurrentEpoch() return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error { res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch) success = err == nil return err })) } func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) { group, err := groupFilters(fs) if err != nil { return nil, err } // if there are conflicts in query and container then it means that there is no // objects to match this query. if group.withCnrFilter && !cnr.Equals(group.cnr) { return nil, nil } // keep matched addresses in this cache // value equal to number (index+1) of latest matched filter mAddr := make(map[string]int) expLen := len(group.fastFilters) // expected value of matched filters in mAddr if len(group.fastFilters) == 0 { expLen = 1 db.selectAll(tx, cnr, mAddr) } else { for i := range group.fastFilters { db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i) } } res := make([]oid.Address, 0, len(mAddr)) for a, ind := range mAddr { if ind != expLen { continue // ignore objects with unmatched fast filters } var id oid.ID err = id.Decode([]byte(a)) if err != nil { return nil, err } var addr oid.Address addr.SetContainer(cnr) addr.SetObject(id) st, err := objectStatus(tx, addr, currEpoch) if err != nil { return nil, err } if st > 0 { continue // ignore removed objects } addr, match := db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) if !match { continue // ignore objects with unmatched slow filters } res = append(res, addr) } return res, nil } // selectAll adds to resulting cache all available objects in metabase. func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) { bucketName := make([]byte, bucketKeySize) selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0) } // selectAllFromBucket goes through all keys in bucket and adds them in a // resulting cache. Keys should be stringed object ids. func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) { bkt := tx.Bucket(name) if bkt == nil { return } _ = bkt.ForEach(func(k, _ []byte) error { markAddressInCache(to, fNum, string(k)) return nil }) } // selectFastFilter makes fast optimized checks for well known buckets or // looking through user attribute buckets otherwise. func (db *DB) selectFastFilter( tx *bbolt.Tx, cnr cid.ID, // container we search on f objectSDK.SearchFilter, // fast filter to map[string]int, // resulting cache fNum int, // index of filter ) { currEpoch := db.epochState.CurrentEpoch() bucketName := make([]byte, bucketKeySize) switch f.Header() { case v2object.FilterHeaderObjectID: db.selectObjectID(tx, f, cnr, to, fNum, currEpoch) case v2object.FilterHeaderObjectType: for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) { selectAllFromBucket(tx, bucketName, to, fNum) } case v2object.FilterHeaderParent: bucketName := parentBucketName(cnr, bucketName) db.selectFromList(tx, bucketName, f, to, fNum) case v2object.FilterHeaderSplitID: bucketName := splitBucketName(cnr, bucketName) db.selectFromList(tx, bucketName, f, to, fNum) case v2object.FilterHeaderECParent: bucketName := ecInfoBucketName(cnr, bucketName) db.selectFromList(tx, bucketName, f, to, fNum) case v2object.FilterPropertyRoot: selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum) case v2object.FilterPropertyPhy: selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum) default: } } var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{ v2object.TypeRegular.String(): {primaryBucketName, parentBucketName}, v2object.TypeTombstone.String(): {tombstoneBucketName}, v2object.TypeLock.String(): {bucketNameLockers}, } func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) { appendNames := func(key string) { fns, ok := mBucketNaming[key] if ok { for _, fn := range fns { names = append(names, fn(cnr, make([]byte, bucketKeySize))) } } } switch mType { default: case objectSDK.MatchStringNotEqual: for key := range mBucketNaming { if key != typeVal { appendNames(key) } } case objectSDK.MatchStringEqual: appendNames(typeVal) case objectSDK.MatchCommonPrefix: for key := range mBucketNaming { if strings.HasPrefix(key, typeVal) { appendNames(key) } } } return } // selectFromList looks into index to find list of addresses to add in // resulting cache. func (db *DB) selectFromList( tx *bbolt.Tx, name []byte, // list root bucket name f objectSDK.SearchFilter, // filter for operation and value to map[string]int, // resulting cache fNum int, // index of filter ) { // bkt := tx.Bucket(name) if bkt == nil { return } var ( lst [][]byte err error ) switch op := f.Operation(); op { case objectSDK.MatchStringEqual: lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value()))) if err != nil { db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, zap.String("error", err.Error())) return } default: fMatch, ok := db.matchers[op] if !ok { db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(op))) return } if err = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(_, val []byte) error { l, err := decodeList(val) if err != nil { db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, zap.String("error", err.Error()), ) return err } lst = append(lst, l...) return nil }); err != nil { db.log.Debug(logs.MetabaseCantIterateOverTheBucket, zap.String("error", err.Error()), ) return } } for i := range lst { markAddressInCache(to, fNum, string(lst[i])) } } // selectObjectID processes objectID filter with in-place optimizations. func (db *DB) selectObjectID( tx *bbolt.Tx, f objectSDK.SearchFilter, cnr cid.ID, to map[string]int, // resulting cache fNum int, // index of filter currEpoch uint64, ) { appendOID := func(id oid.ID) { var addr oid.Address addr.SetContainer(cnr) addr.SetObject(id) var splitInfoError *objectSDK.SplitInfoError ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch) if (err == nil && ok) || errors.As(err, &splitInfoError) { raw := make([]byte, objectKeySize) id.Encode(raw) markAddressInCache(to, fNum, string(raw)) } } switch op := f.Operation(); op { case objectSDK.MatchStringEqual: var id oid.ID if err := id.DecodeString(f.Value()); err == nil { appendOID(id) } default: fMatch, ok := db.matchers[op] if !ok { db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(f.Operation())), ) return } for _, bucketName := range bucketNamesForType(cnr, objectSDK.MatchStringNotEqual, "") { // copy-paste from DB.selectAllFrom bkt := tx.Bucket(bucketName) if bkt == nil { return } err := fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, _ []byte) error { var id oid.ID if err := id.Decode(k); err == nil { appendOID(id) } return nil }) if err != nil { db.log.Debug(logs.MetabaseCouldNotIterateOverTheBuckets, zap.String("error", err.Error()), ) } } } } // matchSlowFilters return true if object header is matched by all slow filters. func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) (oid.Address, bool) { result := addr if len(f) == 0 { return result, true } obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch) if err != nil { return result, false } for i := range f { var data []byte switch f[i].Header() { case v2object.FilterHeaderVersion: data = []byte(obj.Version().String()) case v2object.FilterHeaderHomomorphicHash: if isECChunk { return result, false // EC chunk and EC parent hashes are incomparable } cs, _ := obj.PayloadHomomorphicHash() data = cs.Value() case v2object.FilterHeaderCreationEpoch: data = make([]byte, 8) binary.LittleEndian.PutUint64(data, obj.CreationEpoch()) case v2object.FilterHeaderPayloadLength: if isECChunk { return result, false // EC chunk and EC parent payload lengths are incomparable } data = make([]byte, 8) binary.LittleEndian.PutUint64(data, obj.PayloadSize()) case v2object.FilterHeaderOwnerID: data = []byte(obj.OwnerID().EncodeToString()) case v2object.FilterHeaderPayloadHash: if isECChunk { return result, false // EC chunk and EC parent payload hashes are incomparable } cs, _ := obj.PayloadChecksum() data = cs.Value() default: // user attribute v, ok := attributeValue(obj, f[i].Header()) if ok { if ech := obj.ECHeader(); ech != nil { result.SetObject(ech.Parent()) } data = []byte(v) } else { return result, f[i].Operation() == objectSDK.MatchNotPresent } } matchFunc, ok := db.matchers[f[i].Operation()] if !ok { return result, false } if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) { return result, false } } return result, true } func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) { buf := make([]byte, addressKeySize) obj, err := db.get(tx, addr, buf, true, false, currEpoch) if err != nil { var ecInfoError *objectSDK.ECInfoError if errors.As(err, &ecInfoError) { for _, chunk := range ecInfoError.ECInfo().Chunks { var objID oid.ID if err = objID.ReadFromV2(chunk.ID); err != nil { continue } addr.SetObject(objID) obj, err = db.get(tx, addr, buf, true, false, currEpoch) if err == nil { return obj, true, nil } } } return nil, false, err } return obj, false, nil } func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { objectAttributes := obj.Attributes() if ech := obj.ECHeader(); ech != nil { objectAttributes = ech.ParentAttributes() } for _, attr := range objectAttributes { if attr.Key() == attribute { return attr.Value(), true } } return "", false } // groupFilters divides filters in two groups: fast and slow. Fast filters // processed by indexes and slow filters processed after by unmarshaling // object headers. func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) { res := filterGroup{ fastFilters: make(objectSDK.SearchFilters, 0, len(filters)), slowFilters: make(objectSDK.SearchFilters, 0, len(filters)), } for i := range filters { switch filters[i].Header() { case v2object.FilterHeaderContainerID: // support deprecated field err := res.cnr.DecodeString(filters[i].Value()) if err != nil { return filterGroup{}, fmt.Errorf("can't parse container id: %w", err) } res.withCnrFilter = true case // fast filters v2object.FilterHeaderObjectID, v2object.FilterHeaderObjectType, v2object.FilterHeaderParent, v2object.FilterHeaderSplitID, v2object.FilterHeaderECParent, v2object.FilterPropertyRoot, v2object.FilterPropertyPhy: res.fastFilters = append(res.fastFilters, filters[i]) default: res.slowFilters = append(res.slowFilters, filters[i]) } } return res, nil } func markAddressInCache(cache map[string]int, fNum int, addr string) { if num := cache[addr]; num == fNum { cache[addr] = num + 1 } } // Returns true if at least 1 object can satisfy fs. func checkNonEmpty(fs objectSDK.SearchFilters) bool { for i := range fs { if fs[i].Operation() == objectSDK.MatchNotPresent && isSystemKey(fs[i].Header()) { return true } } return false } // returns true if string key is a reserved system filter key. func isSystemKey(key string) bool { return strings.HasPrefix(key, v2object.ReservedFilterPrefix) }