forked from TrueCloudLab/frostfs-node
[#361] metabase: Support new match types
Support STRING_NOT_EQUAL and NOT_PRESENT match types. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
487c9b7589
commit
ffbf6b922f
5 changed files with 432 additions and 65 deletions
|
@ -3,6 +3,7 @@ package meta
|
|||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
|
@ -88,6 +89,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cid *container.ID, fs object.SearchFil
|
|||
return nil, ErrMissingContainerID
|
||||
}
|
||||
|
||||
// TODO: consider the option of moving this check to a level higher than the metabase
|
||||
if blindlyProcess(fs) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
group, err := groupFilters(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -181,7 +187,7 @@ func (db *DB) selectFastFilter(
|
|||
|
||||
switch f.Header() {
|
||||
case v2object.FilterHeaderObjectID:
|
||||
db.selectObjectID(tx, f, prefix, to, fNum)
|
||||
db.selectObjectID(tx, f, cid, to, fNum)
|
||||
case v2object.FilterHeaderOwnerID:
|
||||
bucketName := ownerBucketName(cid)
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
|
@ -189,26 +195,9 @@ func (db *DB) selectFastFilter(
|
|||
bucketName := payloadHashBucketName(cid)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
case v2object.FilterHeaderObjectType:
|
||||
var bucketName []byte
|
||||
|
||||
switch f.Value() { // do it better after https://github.com/nspcc-dev/neofs-api/issues/84
|
||||
case "Regular":
|
||||
bucketName = primaryBucketName(cid)
|
||||
|
||||
for _, bucketName := range bucketNamesForType(cid, f.Operation(), f.Value()) {
|
||||
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
|
||||
|
||||
bucketName = parentBucketName(cid)
|
||||
case "Tombstone":
|
||||
bucketName = tombstoneBucketName(cid)
|
||||
case "StorageGroup":
|
||||
bucketName = storageGroupBucketName(cid)
|
||||
default:
|
||||
db.log.Debug("unknown object type", zap.String("type", f.Value()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
|
||||
case v2object.FilterHeaderParent:
|
||||
bucketName := parentBucketName(cid)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
|
@ -223,10 +212,57 @@ func (db *DB) selectFastFilter(
|
|||
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum)
|
||||
default: // user attribute
|
||||
bucketName := attributeBucketName(cid, f.Header())
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
|
||||
if f.Operation() == object.MatchNotPresent {
|
||||
selectOutsideFKBT(tx, allBucketNames(cid), bucketName, f, prefix, to, fNum)
|
||||
} else {
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: move to DB struct
|
||||
var mBucketNaming = map[string][]func(*container.ID) []byte{
|
||||
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
|
||||
v2object.TypeTombstone.String(): {tombstoneBucketName},
|
||||
v2object.TypeStorageGroup.String(): {storageGroupBucketName},
|
||||
}
|
||||
|
||||
func allBucketNames(cid *container.ID) (names [][]byte) {
|
||||
for _, fns := range mBucketNaming {
|
||||
for _, fn := range fns {
|
||||
names = append(names, fn(cid))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func bucketNamesForType(cid *container.ID, mType object.SearchMatchType, typeVal string) (names [][]byte) {
|
||||
appendNames := func(key string) {
|
||||
fns, ok := mBucketNaming[key]
|
||||
if ok {
|
||||
for _, fn := range fns {
|
||||
names = append(names, fn(cid))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
switch mType {
|
||||
default:
|
||||
case object.MatchStringNotEqual:
|
||||
for key := range mBucketNaming {
|
||||
if key != typeVal {
|
||||
appendNames(key)
|
||||
}
|
||||
}
|
||||
case object.MatchStringEqual:
|
||||
appendNames(typeVal)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectFromList looks into <fkbt> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromFKBT(
|
||||
|
@ -271,6 +307,54 @@ func (db *DB) selectFromFKBT(
|
|||
}
|
||||
}
|
||||
|
||||
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
|
||||
// resulting cache.
|
||||
func selectOutsideFKBT(
|
||||
tx *bbolt.Tx,
|
||||
incl [][]byte, // buckets
|
||||
name []byte, // fkbt root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
mExcl := make(map[string]struct{})
|
||||
|
||||
bktExcl := tx.Bucket(name)
|
||||
if bktExcl != nil {
|
||||
_ = bktExcl.ForEach(func(k, _ []byte) error {
|
||||
exclBktLeaf := bktExcl.Bucket(k)
|
||||
if exclBktLeaf == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return exclBktLeaf.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
mExcl[addr] = struct{}{}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
for i := range incl {
|
||||
bktIncl := tx.Bucket(incl[i])
|
||||
if bktIncl == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
_ = bktIncl.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
|
||||
if _, ok := mExcl[addr]; !ok {
|
||||
markAddressInCache(to, fNum, addr)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// selectFromList looks into <list> index to find list of addresses to add in
|
||||
// resulting cache.
|
||||
func (db *DB) selectFromList(
|
||||
|
@ -286,21 +370,50 @@ func (db *DB) selectFromList(
|
|||
return
|
||||
}
|
||||
|
||||
switch f.Operation() {
|
||||
var (
|
||||
lst [][]byte
|
||||
err error
|
||||
)
|
||||
|
||||
switch op := f.Operation(); op {
|
||||
case object.MatchStringEqual:
|
||||
lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
default:
|
||||
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation())))
|
||||
fMatch, ok := db.matchers[op]
|
||||
if !ok {
|
||||
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(op)))
|
||||
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// warning: it works only for MatchStringEQ, for NotEQ you should iterate over
|
||||
// bkt and apply matchFunc, don't forget to implement this when it will be
|
||||
// needed. Right now it is not efficient to iterate over bucket
|
||||
// when there is only MatchStringEQ.
|
||||
lst, err := decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error()))
|
||||
if err = bkt.ForEach(func(key, val []byte) error {
|
||||
if !fMatch(f.Header(), key, f.Value()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
l, err := decodeList(val)
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode list bucket leaf",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
lst = append(lst, l...)
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
db.log.Debug("can't iterate over the bucket",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for i := range lst {
|
||||
|
@ -313,36 +426,65 @@ func (db *DB) selectFromList(
|
|||
func (db *DB) selectObjectID(
|
||||
tx *bbolt.Tx,
|
||||
f object.SearchFilter,
|
||||
prefix string,
|
||||
cid *container.ID,
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
switch f.Operation() {
|
||||
prefix := cid.String() + "/"
|
||||
|
||||
appendOID := func(oid string) {
|
||||
addrStr := prefix + string(oid)
|
||||
|
||||
addr := object.NewAddress()
|
||||
|
||||
err := addr.Parse(addrStr)
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode object id address",
|
||||
zap.String("addr", addrStr),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ok, err := db.exists(tx, addr)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
markAddressInCache(to, fNum, addrStr)
|
||||
}
|
||||
}
|
||||
|
||||
switch op := f.Operation(); op {
|
||||
case object.MatchStringEqual:
|
||||
appendOID(f.Value())
|
||||
default:
|
||||
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(f.Operation())))
|
||||
fMatch, ok := db.matchers[op]
|
||||
if !ok {
|
||||
db.log.Debug("unknown operation",
|
||||
zap.Uint32("operation", uint32(f.Operation())),
|
||||
)
|
||||
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// warning: it is in-place optimization and works only for MatchStringEQ,
|
||||
// for NotEQ you should iterate over bkt and apply matchFunc
|
||||
for _, bucketName := range bucketNamesForType(cid, object.MatchStringNotEqual, "") {
|
||||
// copy-paste from DB.selectAllFrom
|
||||
bkt := tx.Bucket(bucketName)
|
||||
if bkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
addrStr := prefix + f.Value()
|
||||
addr := object.NewAddress()
|
||||
err := bkt.ForEach(func(k, v []byte) error {
|
||||
if oid := string(k); fMatch(f.Header(), k, f.Value()) {
|
||||
appendOID(oid)
|
||||
}
|
||||
|
||||
err := addr.Parse(addrStr)
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode object id address",
|
||||
zap.String("addr", addrStr),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ok, err := db.exists(tx, addr)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
markAddressInCache(to, fNum, addrStr)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
db.log.Debug("could not iterate over the buckets",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -425,3 +567,23 @@ func markAddressInCache(cache map[string]int, fNum int, addr string) {
|
|||
cache[addr] = num + 1
|
||||
}
|
||||
}
|
||||
|
||||
// returns true if query leads to a deliberately empty result.
|
||||
func blindlyProcess(fs object.SearchFilters) bool {
|
||||
for i := range fs {
|
||||
if fs[i].Operation() == object.MatchNotPresent && isSystemKey(fs[i].Header()) {
|
||||
return true
|
||||
}
|
||||
|
||||
// TODO: check other cases
|
||||
// e.g. (a == b) && (a != b)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// returns true if string key is a reserved system filter key.
|
||||
func isSystemKey(key string) bool {
|
||||
// FIXME: version-dependent approach
|
||||
return strings.HasPrefix(key, v2object.ReservedFilterPrefix)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue