forked from TrueCloudLab/frostfs-node
574 lines
15 KiB
Go
574 lines
15 KiB
Go
package meta
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
v2object "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
// filterGroup is a structure that have search filters grouped by access
|
|
// method. We have fast filters that looks for indexes and do not unmarshal
|
|
// objects, and slow filters, that applied after fast filters created
|
|
// smaller set of objects to check.
|
|
filterGroup struct {
|
|
withCnrFilter bool
|
|
|
|
cnr cid.ID
|
|
|
|
fastFilters, slowFilters objectSDK.SearchFilters
|
|
}
|
|
)
|
|
|
|
// SelectPrm groups the parameters of Select operation.
|
|
type SelectPrm struct {
|
|
cnr cid.ID
|
|
filters objectSDK.SearchFilters
|
|
}
|
|
|
|
// SelectRes groups the resulting values of Select operation.
|
|
type SelectRes struct {
|
|
addrList []oid.Address
|
|
}
|
|
|
|
// SetContainerID is a Select option to set the container id to search in.
|
|
func (p *SelectPrm) SetContainerID(cnr cid.ID) {
|
|
p.cnr = cnr
|
|
}
|
|
|
|
// SetFilters is a Select option to set the object filters.
|
|
func (p *SelectPrm) SetFilters(fs objectSDK.SearchFilters) {
|
|
p.filters = fs
|
|
}
|
|
|
|
// AddressList returns list of addresses of the selected objects.
|
|
func (r SelectRes) AddressList() []oid.Address {
|
|
return r.addrList
|
|
}
|
|
|
|
// Select returns list of addresses of objects that match search filters.
|
|
func (db *DB) Select(ctx context.Context, prm SelectPrm) (res SelectRes, err error) {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("Select", time.Since(startedAt), success)
|
|
}()
|
|
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.Select",
|
|
trace.WithAttributes(
|
|
attribute.String("container_id", prm.cnr.EncodeToString()),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
}
|
|
|
|
if checkNonEmpty(prm.filters) {
|
|
success = true
|
|
return res, nil
|
|
}
|
|
|
|
currEpoch := db.epochState.CurrentEpoch()
|
|
|
|
return res, metaerr.Wrap(db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch)
|
|
success = err == nil
|
|
return err
|
|
}))
|
|
}
|
|
|
|
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs objectSDK.SearchFilters, currEpoch uint64) ([]oid.Address, error) {
|
|
group, err := groupFilters(fs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// if there are conflicts in query and container then it means that there is no
|
|
// objects to match this query.
|
|
if group.withCnrFilter && !cnr.Equals(group.cnr) {
|
|
return nil, nil
|
|
}
|
|
|
|
// keep matched addresses in this cache
|
|
// value equal to number (index+1) of latest matched filter
|
|
mAddr := make(map[string]int)
|
|
|
|
expLen := len(group.fastFilters) // expected value of matched filters in mAddr
|
|
|
|
if len(group.fastFilters) == 0 {
|
|
expLen = 1
|
|
|
|
db.selectAll(tx, cnr, mAddr)
|
|
} else {
|
|
for i := range group.fastFilters {
|
|
db.selectFastFilter(tx, cnr, group.fastFilters[i], mAddr, i)
|
|
}
|
|
}
|
|
|
|
res := make([]oid.Address, 0, len(mAddr))
|
|
|
|
for a, ind := range mAddr {
|
|
if ind != expLen {
|
|
continue // ignore objects with unmatched fast filters
|
|
}
|
|
|
|
var id oid.ID
|
|
err = id.Decode([]byte(a))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var addr oid.Address
|
|
addr.SetContainer(cnr)
|
|
addr.SetObject(id)
|
|
|
|
if objectStatus(tx, addr, currEpoch) > 0 {
|
|
continue // ignore removed objects
|
|
}
|
|
|
|
if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) {
|
|
continue // ignore objects with unmatched slow filters
|
|
}
|
|
|
|
res = append(res, addr)
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// selectAll adds to resulting cache all available objects in metabase.
|
|
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
|
|
bucketName := make([]byte, bucketKeySize)
|
|
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
|
|
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
|
selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
|
|
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
|
|
}
|
|
|
|
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
|
// resulting cache. Keys should be stringed object ids.
|
|
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
|
bkt := tx.Bucket(name)
|
|
if bkt == nil {
|
|
return
|
|
}
|
|
|
|
_ = bkt.ForEach(func(k, _ []byte) error {
|
|
markAddressInCache(to, fNum, string(k))
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// selectFastFilter makes fast optimized checks for well known buckets or
|
|
// looking through user attribute buckets otherwise.
|
|
func (db *DB) selectFastFilter(
|
|
tx *bbolt.Tx,
|
|
cnr cid.ID, // container we search on
|
|
f objectSDK.SearchFilter, // fast filter
|
|
to map[string]int, // resulting cache
|
|
fNum int, // index of filter
|
|
) {
|
|
currEpoch := db.epochState.CurrentEpoch()
|
|
bucketName := make([]byte, bucketKeySize)
|
|
switch f.Header() {
|
|
case v2object.FilterHeaderObjectID:
|
|
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
|
case v2object.FilterHeaderOwnerID:
|
|
bucketName := ownerBucketName(cnr, bucketName)
|
|
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
|
case v2object.FilterHeaderPayloadHash:
|
|
bucketName := payloadHashBucketName(cnr, bucketName)
|
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
|
case v2object.FilterHeaderObjectType:
|
|
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
|
selectAllFromBucket(tx, bucketName, to, fNum)
|
|
}
|
|
case v2object.FilterHeaderParent:
|
|
bucketName := parentBucketName(cnr, bucketName)
|
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
|
case v2object.FilterHeaderSplitID:
|
|
bucketName := splitBucketName(cnr, bucketName)
|
|
db.selectFromList(tx, bucketName, f, to, fNum)
|
|
case v2object.FilterPropertyRoot:
|
|
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
|
case v2object.FilterPropertyPhy:
|
|
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
|
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
|
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
|
default: // user attribute
|
|
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
|
|
|
if f.Operation() == objectSDK.MatchNotPresent {
|
|
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
|
|
} else {
|
|
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
|
}
|
|
}
|
|
}
|
|
|
|
var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
|
|
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
|
|
v2object.TypeTombstone.String(): {tombstoneBucketName},
|
|
v2object.TypeLock.String(): {bucketNameLockers},
|
|
}
|
|
|
|
func allBucketNames(cnr cid.ID) (names [][]byte) {
|
|
for _, fns := range mBucketNaming {
|
|
for _, fn := range fns {
|
|
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func bucketNamesForType(cnr cid.ID, mType objectSDK.SearchMatchType, typeVal string) (names [][]byte) {
|
|
appendNames := func(key string) {
|
|
fns, ok := mBucketNaming[key]
|
|
if ok {
|
|
for _, fn := range fns {
|
|
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
|
}
|
|
}
|
|
}
|
|
|
|
switch mType {
|
|
default:
|
|
case objectSDK.MatchStringNotEqual:
|
|
for key := range mBucketNaming {
|
|
if key != typeVal {
|
|
appendNames(key)
|
|
}
|
|
}
|
|
case objectSDK.MatchStringEqual:
|
|
appendNames(typeVal)
|
|
case objectSDK.MatchCommonPrefix:
|
|
for key := range mBucketNaming {
|
|
if strings.HasPrefix(key, typeVal) {
|
|
appendNames(key)
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// selectFromList looks into <fkbt> index to find list of addresses to add in
|
|
// resulting cache.
|
|
func (db *DB) selectFromFKBT(
|
|
tx *bbolt.Tx,
|
|
name []byte, // fkbt root bucket name
|
|
f objectSDK.SearchFilter, // filter for operation and value
|
|
to map[string]int, // resulting cache
|
|
fNum int, // index of filter
|
|
) { //
|
|
matchFunc, ok := db.matchers[f.Operation()]
|
|
if !ok {
|
|
db.log.Debug(logs.MetabaseMissingMatcher, zap.Uint32("operation", uint32(f.Operation())))
|
|
|
|
return
|
|
}
|
|
|
|
fkbtRoot := tx.Bucket(name)
|
|
if fkbtRoot == nil {
|
|
return
|
|
}
|
|
|
|
err := matchFunc.matchBucket(fkbtRoot, f.Header(), f.Value(), func(k, _ []byte) error {
|
|
fkbtLeaf := fkbtRoot.Bucket(k)
|
|
if fkbtLeaf == nil {
|
|
return nil
|
|
}
|
|
|
|
return fkbtLeaf.ForEach(func(k, _ []byte) error {
|
|
markAddressInCache(to, fNum, string(k))
|
|
|
|
return nil
|
|
})
|
|
})
|
|
if err != nil {
|
|
db.log.Debug(logs.MetabaseErrorInFKBTSelection, zap.String("error", err.Error()))
|
|
}
|
|
}
|
|
|
|
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
|
|
// resulting cache.
|
|
func selectOutsideFKBT(
|
|
tx *bbolt.Tx,
|
|
incl [][]byte, // buckets
|
|
name []byte, // fkbt root bucket name
|
|
to map[string]int, // resulting cache
|
|
fNum int, // index of filter
|
|
) {
|
|
mExcl := make(map[string]struct{})
|
|
|
|
bktExcl := tx.Bucket(name)
|
|
if bktExcl != nil {
|
|
_ = bktExcl.ForEachBucket(func(k []byte) error {
|
|
exclBktLeaf := bktExcl.Bucket(k)
|
|
return exclBktLeaf.ForEach(func(k, _ []byte) error {
|
|
mExcl[string(k)] = struct{}{}
|
|
|
|
return nil
|
|
})
|
|
})
|
|
}
|
|
|
|
for i := range incl {
|
|
bktIncl := tx.Bucket(incl[i])
|
|
if bktIncl == nil {
|
|
continue
|
|
}
|
|
|
|
_ = bktIncl.ForEach(func(k, _ []byte) error {
|
|
if _, ok := mExcl[string(k)]; !ok {
|
|
markAddressInCache(to, fNum, string(k))
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
// selectFromList looks into <list> index to find list of addresses to add in
|
|
// resulting cache.
|
|
func (db *DB) selectFromList(
|
|
tx *bbolt.Tx,
|
|
name []byte, // list root bucket name
|
|
f objectSDK.SearchFilter, // filter for operation and value
|
|
to map[string]int, // resulting cache
|
|
fNum int, // index of filter
|
|
) { //
|
|
bkt := tx.Bucket(name)
|
|
if bkt == nil {
|
|
return
|
|
}
|
|
|
|
var (
|
|
lst [][]byte
|
|
err error
|
|
)
|
|
|
|
switch op := f.Operation(); op {
|
|
case objectSDK.MatchStringEqual:
|
|
lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
|
|
if err != nil {
|
|
db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf, zap.String("error", err.Error()))
|
|
return
|
|
}
|
|
default:
|
|
fMatch, ok := db.matchers[op]
|
|
if !ok {
|
|
db.log.Debug(logs.MetabaseUnknownOperation, zap.Uint32("operation", uint32(op)))
|
|
|
|
return
|
|
}
|
|
|
|
if err = fMatch.matchBucket(bkt, f.Header(), f.Value(), func(_, val []byte) error {
|
|
l, err := decodeList(val)
|
|
if err != nil {
|
|
db.log.Debug(logs.MetabaseCantDecodeListBucketLeaf,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
lst = append(lst, l...)
|
|
|
|
return nil
|
|
}); err != nil {
|
|
db.log.Debug(logs.MetabaseCantIterateOverTheBucket,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
return
|
|
}
|
|
}
|
|
|
|
for i := range lst {
|
|
markAddressInCache(to, fNum, string(lst[i]))
|
|
}
|
|
}
|
|
|
|
// selectObjectID processes objectID filter with in-place optimizations.
|
|
func (db *DB) selectObjectID(
|
|
tx *bbolt.Tx,
|
|
f objectSDK.SearchFilter,
|
|
cnr cid.ID,
|
|
to map[string]int, // resulting cache
|
|
fNum int, // index of filter
|
|
currEpoch uint64,
|
|
) {
|
|
appendOID := func(id oid.ID) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnr)
|
|
addr.SetObject(id)
|
|
|
|
var splitInfoError *objectSDK.SplitInfoError
|
|
ok, _, err := db.exists(tx, addr, oid.Address{}, currEpoch)
|
|
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
|
raw := make([]byte, objectKeySize)
|
|
id.Encode(raw)
|
|
markAddressInCache(to, fNum, string(raw))
|
|
}
|
|
}
|
|
|
|
switch op := f.Operation(); op {
|
|
case objectSDK.MatchStringEqual:
|
|
var id oid.ID
|
|
if err := id.DecodeString(f.Value()); err == nil {
|
|
appendOID(id)
|
|
}
|
|
default:
|
|
fMatch, ok := db.matchers[op]
|
|
if !ok {
|
|
db.log.Debug(logs.MetabaseUnknownOperation,
|
|
zap.Uint32("operation", uint32(f.Operation())),
|
|
)
|
|
|
|
return
|
|
}
|
|
|
|
for _, bucketName := range bucketNamesForType(cnr, objectSDK.MatchStringNotEqual, "") {
|
|
// copy-paste from DB.selectAllFrom
|
|
bkt := tx.Bucket(bucketName)
|
|
if bkt == nil {
|
|
return
|
|
}
|
|
|
|
err := fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, _ []byte) error {
|
|
var id oid.ID
|
|
if err := id.Decode(k); err == nil {
|
|
appendOID(id)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
db.log.Debug(logs.MetabaseCouldNotIterateOverTheBuckets,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// matchSlowFilters return true if object header is matched by all slow filters.
|
|
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.SearchFilters, currEpoch uint64) bool {
|
|
if len(f) == 0 {
|
|
return true
|
|
}
|
|
|
|
buf := make([]byte, addressKeySize)
|
|
obj, err := db.get(tx, addr, buf, true, false, currEpoch)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
for i := range f {
|
|
matchFunc, ok := db.matchers[f[i].Operation()]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
var data []byte
|
|
|
|
switch f[i].Header() {
|
|
case v2object.FilterHeaderVersion:
|
|
data = []byte(obj.Version().String())
|
|
case v2object.FilterHeaderHomomorphicHash:
|
|
cs, _ := obj.PayloadHomomorphicHash()
|
|
data = cs.Value()
|
|
case v2object.FilterHeaderCreationEpoch:
|
|
data = make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
|
|
case v2object.FilterHeaderPayloadLength:
|
|
data = make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
|
|
default:
|
|
continue // ignore unknown search attributes
|
|
}
|
|
|
|
if !matchFunc.matchSlow(f[i].Header(), data, f[i].Value()) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// groupFilters divides filters in two groups: fast and slow. Fast filters
|
|
// processed by indexes and slow filters processed after by unmarshaling
|
|
// object headers.
|
|
func groupFilters(filters objectSDK.SearchFilters) (filterGroup, error) {
|
|
res := filterGroup{
|
|
fastFilters: make(objectSDK.SearchFilters, 0, len(filters)),
|
|
slowFilters: make(objectSDK.SearchFilters, 0, len(filters)),
|
|
}
|
|
|
|
for i := range filters {
|
|
switch filters[i].Header() {
|
|
case v2object.FilterHeaderContainerID: // support deprecated field
|
|
err := res.cnr.DecodeString(filters[i].Value())
|
|
if err != nil {
|
|
return filterGroup{}, fmt.Errorf("can't parse container id: %w", err)
|
|
}
|
|
|
|
res.withCnrFilter = true
|
|
case // slow filters
|
|
v2object.FilterHeaderVersion,
|
|
v2object.FilterHeaderCreationEpoch,
|
|
v2object.FilterHeaderPayloadLength,
|
|
v2object.FilterHeaderHomomorphicHash:
|
|
res.slowFilters = append(res.slowFilters, filters[i])
|
|
default: // fast filters or user attributes if unknown
|
|
res.fastFilters = append(res.fastFilters, filters[i])
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func markAddressInCache(cache map[string]int, fNum int, addr string) {
|
|
if num := cache[addr]; num == fNum {
|
|
cache[addr] = num + 1
|
|
}
|
|
}
|
|
|
|
// Returns true if at least 1 object can satisfy fs.
|
|
func checkNonEmpty(fs objectSDK.SearchFilters) bool {
|
|
for i := range fs {
|
|
if fs[i].Operation() == objectSDK.MatchNotPresent && isSystemKey(fs[i].Header()) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// returns true if string key is a reserved system filter key.
|
|
func isSystemKey(key string) bool {
|
|
return strings.HasPrefix(key, v2object.ReservedFilterPrefix)
|
|
}
|