frostfs-node/pkg/local_object_storage/metabase/select.go
2024-12-18 15:52:26 +00:00

613 lines
16 KiB
Go

package meta
import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
v2object "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type (
// filterGroup is a structure that have search filters grouped by access
// method. We have fast filters that looks for indexes and do not unmarshal
// objects, and slow filters, that applied after fast filters created
// smaller set of objects to check.
filterGroup struct {
withCnrFilter bool
cnr cid.ID
fastFilters, slowFilters objectSDK.SearchFilters
}
)
// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
cnr cid.ID
filters objectSDK.SearchFilters
useAttributeIndex bool
}
// SelectRes groups the resulting values of Select operation.
type SelectRes struct {
addrList []oid.Address
}
// SetContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) SetContainerID(cnr cid.ID) {
p.cnr = cnr
}
// SetFilters is a Select option to set the object filters.
func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) {
p.filters = fs
}
func (p *SelectPrm) SetUseAttributeIndex(v bool) {
p.useAttributeIndex = v
}
// AddressList returns list of addresses of the selected objects.
func (r SelectRes) AddressList() []oid.Address {
return r.addrList
}
// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("Select", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.Select",
trace.WithAttributes(
attribute.String("container_id", prm.cnr.EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
}
if checkNonEmpty(prm.filters) {
success = true
return res, nil
}
currEpoch := db.epochState.CurrentEpoch()
return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch, prm.useAttributeIndex)
success = err == nil
return err
}))
}
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64, useAttributeIndex bool) ([]oid.Address, error) {
group, err := groupFilters(fs, useAttributeIndex)
if err != nil {
return nil, err
}
// if there are conflicts in query and container then it means that there is no
// objects to match this query.
if group.withCnrFilter && !cnr.Equals(group.cnr) {
return nil, nil
}
// keep matched addresses in this cache
// value equal to number (index+1) of latest matched filter
mAddr := make(map[string]int)
expLen := len(group.fastFilters) // expected value of matched filters in mAddr
if len(group.fastFilters) == 0 {
expLen = 1
db.selectAll(tx, cnr, mAddr)
} else {
for i := range group.fastFilters {
db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
}
}
res := make([]oid.Address, 0, len(mAddr))
for a, ind := range mAddr {
if ind != expLen {
continue // ignore objects with unmatched fast filters
}
var id oid.ID
err = id.Decode([]byte(a))
if err != nil {
return nil, err
}
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(id)
st, err := objectStatus(tx, addr, currEpoch)
if err != nil {
return nil, err
}
if st > 0 {
continue // ignore removed objects
}
addr, match := db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch)
if !match {
continue // ignore objects with unmatched slow filters
}
res = append(res, addr)
}
return res, nil
}
// selectAll adds to resulting cache all available objects in metabase.
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
bucketName := make([]byte, bucketKeySize)
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
}
// selectAllFromBucket goes through all keys in bucket and adds them in a
// resulting cache. Keys should be stringed object ids.
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
bkt := tx.Bucket(name)
if bkt == nil {
return
}
_ = bkt.ForEach(func(k, _ []byte) error {
markAddressInCache(to, fNum, string(k))
return nil
})
}
// selectFastFilter makes fast optimized checks for well known buckets or
// looking through user attribute buckets otherwise.
func (db *DB) selectFastFilter(
tx *bbolt.Tx,
cnr cid.ID, // container we search on
f objectSDK.SearchFilter, // fast filter
to map[string]int, // resulting cache
fNum int, // index of filter
) {
currEpoch := db.epochState.CurrentEpoch()
bucketName := make([]byte, bucketKeySize)
switch f.Header() {
case v2object.FilterHeaderObjectID:
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
case v2object.FilterHeaderObjectType:
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
selectAllFromBucket(tx, bucketName, to, fNum)
}
case v2object.FilterHeaderParent:
bucketName := parentBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
case v2object.FilterHeaderSplitID:
bucketName := splitBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
case v2object.FilterHeaderECParent:
bucketName := ecInfoBucketName(cnr, bucketName)
db.selectFromList(tx, bucketName, f, to, fNum)
case v2object.FilterPropertyRoot:
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
case v2object.FilterPropertyPhy:
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
default: // user attribute
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
if f.Operation() == objectSDK.MatchNotPresent {
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
} else {
db.selectFromFKBT(tx, bucketName, f, to, fNum)
}
}
}
var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
v2object.TypeTombstone.String(): {tombstoneBucketName},
v2object.TypeLock.String(): {bucketNameLockers},
}
func allBucketNames(cnr cid.ID) (names [][]byte) {
for _, fns := range mBucketNaming {
for _, fn := range fns {
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
}
}
return
}
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
appendNames := func(key string) {
fns, ok := mBucketNaming[key]
if ok {
for _, fn := range fns {
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
}
}
}
switch mType {
default:
case objectSDK.MatchStringNotEqual:
for key := range mBucketNaming {
if key != typeVal {
appendNames(key)
}
}
case objectSDK.MatchStringEqual:
appendNames(typeVal)
case objectSDK.MatchCommonPrefix:
for key := range mBucketNaming {
if strings.HasPrefix(key, typeVal) {
appendNames(key)
}
}
}
return
}
func (db *DB) selectFromFKBT(
tx *bbolt.Tx,
name []byte, // fkbt root bucket name
f objectSDK.SearchFilter, // filter for operation and value
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
matchFunc, ok := db.matchers[f.Operation()]
if !ok {
return
}
fkbtRoot := tx.Bucket(name)
if fkbtRoot == nil {
return
}
_ = matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
fkbtLeaf := fkbtRoot.Bucket(k)
if fkbtLeaf == nil {
return nil
}
return fkbtLeaf.ForEach(func(k, _ []byte) error {
markAddressInCache(to, fNum, string(k))
return nil
})
})
}
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
// resulting cache.
func selectOutsideFKBT(
tx *bbolt.Tx,
incl [][]byte, // buckets
name []byte, // fkbt root bucket name
to map[string]int, // resulting cache
fNum int, // index of filter
) {
mExcl := make(map[string]struct{})
bktExcl := tx.Bucket(name)
if bktExcl != nil {
_ = bktExcl.ForEachBucket(func(k []byte) error {
exclBktLeaf := bktExcl.Bucket(k)
return exclBktLeaf.ForEach(func(k, _ []byte) error {
mExcl[string(k)] = struct{}{}
return nil
})
})
}
for i := range incl {
bktIncl := tx.Bucket(incl[i])
if bktIncl == nil {
continue
}
_ = bktIncl.ForEach(func(k, _ []byte) error {
if _, ok := mExcl[string(k)]; !ok {
markAddressInCache(to, fNum, string(k))
}
return nil
})
}
}
// selectFromList looks into <list> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromList(
tx *bbolt.Tx,
name []byte, // list root bucket name
f objectSDK.SearchFilter, // filter for operation and value
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
bkt := tx.Bucket(name)
if bkt == nil {
return
}
var (
lst [][]byte
err error
)
switch op := f.Operation(); op {
case objectSDK.MatchStringEqual:
lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
if err != nil {
return
}
default:
fMatch, ok := db.matchers[op]
if !ok {
return
}
if err = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(_, val []byte) error {
l, err := decodeList(val)
if err != nil {
return err
}
lst = append(lst, l...)
return nil
}); err != nil {
return
}
}
for i := range lst {
markAddressInCache(to, fNum, string(lst[i]))
}
}
// selectObjectID processes objectID filter with in-place optimizations.
func (db *DB) selectObjectID(
tx *bbolt.Tx,
f objectSDK.SearchFilter,
cnr cid.ID,
to map[string]int, // resulting cache
fNum int, // index of filter
currEpoch uint64,
) {
appendOID := func(id oid.ID) {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(id)
var splitInfoError *objectSDK.SplitInfoError
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
if (err == nil && ok) || errors.As(err, &splitInfoError) {
raw := make([]byte, objectKeySize)
id.Encode(raw)
markAddressInCache(to, fNum, string(raw))
}
}
switch op := f.Operation(); op {
case objectSDK.MatchStringEqual:
var id oid.ID
if err := id.DecodeString(f.Value()); err == nil {
appendOID(id)
}
default:
fMatch, ok := db.matchers[op]
if !ok {
return
}
for _, bucketName := range bucketNamesForType(cnr, objectSDK.MatchStringNotEqual, "") {
// copy-paste from DB.selectAllFrom
bkt := tx.Bucket(bucketName)
if bkt == nil {
return
}
_ = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, _ []byte) error {
var id oid.ID
if err := id.Decode(k); err == nil {
appendOID(id)
}
return nil
})
}
}
}
// matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) (oid.Address, bool) {
result := addr
if len(f) == 0 {
return result, true
}
obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch)
if err != nil {
return result, false
}
for i := range f {
var data []byte
switch f[i].Header() {
case v2object.FilterHeaderVersion:
data = []byte(obj.Version().String())
case v2object.FilterHeaderHomomorphicHash:
if isECChunk {
return result, false // EC chunk and EC parent hashes are incomparable
}
cs, _ := obj.PayloadHomomorphicHash()
data = cs.Value()
case v2object.FilterHeaderCreationEpoch:
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
case v2object.FilterHeaderPayloadLength:
if isECChunk {
return result, false // EC chunk and EC parent payload lengths are incomparable
}
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
case v2object.FilterHeaderOwnerID:
data = []byte(obj.OwnerID().EncodeToString())
case v2object.FilterHeaderPayloadHash:
if isECChunk {
return result, false // EC chunk and EC parent payload hashes are incomparable
}
cs, _ := obj.PayloadChecksum()
data = cs.Value()
default: // user attribute
v, ok := attributeValue(obj, f[i].Header())
if ok {
if ech := obj.ECHeader(); ech != nil {
result.SetObject(ech.Parent())
}
data = []byte(v)
} else {
return result, f[i].Operation() == objectSDK.MatchNotPresent
}
}
matchFunc, ok := db.matchers[f[i].Operation()]
if !ok {
return result, false
}
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
return result, false
}
}
return result, true
}
func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
buf := make([]byte, addressKeySize)
obj, err := db.get(tx, addr, buf, true, false, currEpoch)
if err != nil {
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
for _, chunk := range ecInfoError.ECInfo().Chunks {
var objID oid.ID
if err = objID.ReadFromV2(chunk.ID); err != nil {
continue
}
addr.SetObject(objID)
obj, err = db.get(tx, addr, buf, true, false, currEpoch)
if err == nil {
return obj, true, nil
}
}
}
return nil, false, err
}
return obj, false, nil
}
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
objectAttributes := obj.Attributes()
if ech := obj.ECHeader(); ech != nil {
objectAttributes = ech.ParentAttributes()
}
for _, attr := range objectAttributes {
if attr.Key() == attribute {
return attr.Value(), true
}
}
return "", false
}
// groupFilters divides filters in two groups: fast and slow. Fast filters
// processed by indexes and slow filters processed after by unmarshaling
// object headers.
func groupFilters(filters objectSDK.SearchFilters, useAttributeIndex bool) (filterGroup, error) {
res := filterGroup{
fastFilters: make(objectSDK.SearchFilters, 0, len(filters)),
slowFilters: make(objectSDK.SearchFilters, 0, len(filters)),
}
for i := range filters {
switch filters[i].Header() {
case v2object.FilterHeaderContainerID: // support deprecated field
err := res.cnr.DecodeString(filters[i].Value())
if err != nil {
return filterGroup{}, fmt.Errorf("parse container id: %w", err)
}
res.withCnrFilter = true
case // fast filters
v2object.FilterHeaderObjectID,
v2object.FilterHeaderObjectType,
v2object.FilterHeaderParent,
v2object.FilterHeaderSplitID,
v2object.FilterHeaderECParent,
v2object.FilterPropertyRoot,
v2object.FilterPropertyPhy:
res.fastFilters = append(res.fastFilters, filters[i])
default:
if useAttributeIndex && IsAtrributeIndexed(filters[i].Header()) {
res.fastFilters = append(res.fastFilters, filters[i])
} else {
res.slowFilters = append(res.slowFilters, filters[i])
}
}
}
return res, nil
}
func markAddressInCache(cache map[string]int, fNum int, addr string) {
if num := cache[addr]; num == fNum {
cache[addr] = num + 1
}
}
// Returns true if at least 1 object can satisfy fs.
func checkNonEmpty(fs objectSDK.SearchFilters) bool {
for i := range fs {
if fs[i].Operation() == objectSDK.MatchNotPresent && isSystemKey(fs[i].Header()) {
return true
}
}
return false
}
// returns true if string key is a reserved system filter key.
func isSystemKey(key string) bool {
return strings.HasPrefix(key, v2object.ReservedFilterPrefix)
}