frostfs-node/pkg/local_object_storage/metabase/select.go
Leonard Lyubich 14329ab565 [] metabase: Distinguish objects with tombstones and GC marks
Each object from graveyard has tombstone or GC mark. If object has
tombstone, metabase should return `ErrAlreadyRemoved` on object requests.
This is the case when user clearly removed the object from container. GC
marks are used for physical removal which can appear even if object is still
presented in container (Control service, Policer job, etc.). In this case
metabase should return 404 error on object requests.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-09-27 11:27:41 +03:00

593 lines
15 KiB
Go

package meta
import (
"encoding/binary"
"errors"
"fmt"
"strings"
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
v2object "github.com/nspcc-dev/neofs-api-go/v2/object"
"go.etcd.io/bbolt"
"go.uber.org/zap"
)
type (
// filterGroup is a structure that have search filters grouped by access
// method. We have fast filters that looks for indexes and do not unmarshal
// objects, and slow filters, that applied after fast filters created
// smaller set of objects to check.
filterGroup struct {
cid *cid.ID
fastFilters object.SearchFilters
slowFilters object.SearchFilters
}
)
// SelectPrm groups the parameters of Select operation.
type SelectPrm struct {
cid *cid.ID
filters object.SearchFilters
}
// SelectRes groups resulting values of Select operation.
type SelectRes struct {
addrList []*object.Address
}
// WithContainerID is a Select option to set the container id to search in.
func (p *SelectPrm) WithContainerID(cid *cid.ID) *SelectPrm {
if p != nil {
p.cid = cid
}
return p
}
// WithFilters is a Select option to set the object filters.
func (p *SelectPrm) WithFilters(fs object.SearchFilters) *SelectPrm {
if p != nil {
p.filters = fs
}
return p
}
// AddressList returns list of addresses of the selected objects.
func (r *SelectRes) AddressList() []*object.Address {
return r.addrList
}
var ErrMissingContainerID = errors.New("missing container id field")
// Select selects the objects from DB with filtering.
func Select(db *DB, cid *cid.ID, fs object.SearchFilters) ([]*object.Address, error) {
r, err := db.Select(new(SelectPrm).WithFilters(fs).WithContainerID(cid))
if err != nil {
return nil, err
}
return r.AddressList(), nil
}
// Select returns list of addresses of objects that match search filters.
func (db *DB) Select(prm *SelectPrm) (res *SelectRes, err error) {
res = new(SelectRes)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, err = db.selectObjects(tx, prm.cid, prm.filters)
return err
})
return res, err
}
func (db *DB) selectObjects(tx *bbolt.Tx, cid *cid.ID, fs object.SearchFilters) ([]*object.Address, error) {
if cid == nil {
return nil, ErrMissingContainerID
}
// TODO: consider the option of moving this check to a level higher than the metabase
if blindlyProcess(fs) {
return nil, nil
}
group, err := groupFilters(fs)
if err != nil {
return nil, err
}
// if there are conflicts in query and cid then it means that there is no
// objects to match this query.
if group.cid != nil && !cid.Equal(group.cid) {
return nil, nil
}
// keep matched addresses in this cache
// value equal to number (index+1) of latest matched filter
mAddr := make(map[string]int)
expLen := len(group.fastFilters) // expected value of matched filters in mAddr
if len(group.fastFilters) == 0 {
expLen = 1
db.selectAll(tx, cid, mAddr)
} else {
for i := range group.fastFilters {
db.selectFastFilter(tx, cid, group.fastFilters[i], mAddr, i)
}
}
res := make([]*object.Address, 0, len(mAddr))
for a, ind := range mAddr {
if ind != expLen {
continue // ignore objects with unmatched fast filters
}
addr, err := addressFromKey([]byte(a))
if err != nil {
// TODO: storage was broken, so we need to handle it
return nil, err
}
if inGraveyard(tx, addr) > 0 {
continue // ignore removed objects
}
if !db.matchSlowFilters(tx, addr, group.slowFilters) {
continue // ignore objects with unmatched slow filters
}
res = append(res, addr)
}
return res, nil
}
// selectAll adds to resulting cache all available objects in metabase.
func (db *DB) selectAll(tx *bbolt.Tx, cid *cid.ID, to map[string]int) {
prefix := cid.String() + "/"
selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, 0)
selectAllFromBucket(tx, parentBucketName(cid), prefix, to, 0)
}
// selectAllFromBucket goes through all keys in bucket and adds them in a
// resulting cache. Keys should be stringed object ids.
func selectAllFromBucket(tx *bbolt.Tx, name []byte, prefix string, to map[string]int, fNum int) {
bkt := tx.Bucket(name)
if bkt == nil {
return
}
_ = bkt.ForEach(func(k, v []byte) error {
key := prefix + string(k) // consider using string builders from sync.Pool
markAddressInCache(to, fNum, key)
return nil
})
}
// selectFastFilter makes fast optimized checks for well known buckets or
// looking through user attribute buckets otherwise.
func (db *DB) selectFastFilter(
tx *bbolt.Tx,
cid *cid.ID, // container we search on
f object.SearchFilter, // fast filter
to map[string]int, // resulting cache
fNum int, // index of filter
) {
prefix := cid.String() + "/"
switch f.Header() {
case v2object.FilterHeaderObjectID:
db.selectObjectID(tx, f, cid, to, fNum)
case v2object.FilterHeaderOwnerID:
bucketName := ownerBucketName(cid)
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterHeaderPayloadHash:
bucketName := payloadHashBucketName(cid)
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterHeaderObjectType:
for _, bucketName := range bucketNamesForType(cid, f.Operation(), f.Value()) {
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
}
case v2object.FilterHeaderParent:
bucketName := parentBucketName(cid)
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterHeaderSplitID:
bucketName := splitBucketName(cid)
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
case v2object.FilterPropertyRoot:
selectAllFromBucket(tx, rootBucketName(cid), prefix, to, fNum)
case v2object.FilterPropertyPhy:
selectAllFromBucket(tx, primaryBucketName(cid), prefix, to, fNum)
selectAllFromBucket(tx, tombstoneBucketName(cid), prefix, to, fNum)
selectAllFromBucket(tx, storageGroupBucketName(cid), prefix, to, fNum)
default: // user attribute
bucketName := attributeBucketName(cid, f.Header())
if f.Operation() == object.MatchNotPresent {
selectOutsideFKBT(tx, allBucketNames(cid), bucketName, f, prefix, to, fNum)
} else {
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
}
}
}
// TODO: move to DB struct
var mBucketNaming = map[string][]func(*cid.ID) []byte{
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
v2object.TypeTombstone.String(): {tombstoneBucketName},
v2object.TypeStorageGroup.String(): {storageGroupBucketName},
}
func allBucketNames(cid *cid.ID) (names [][]byte) {
for _, fns := range mBucketNaming {
for _, fn := range fns {
names = append(names, fn(cid))
}
}
return
}
func bucketNamesForType(cid *cid.ID, mType object.SearchMatchType, typeVal string) (names [][]byte) {
appendNames := func(key string) {
fns, ok := mBucketNaming[key]
if ok {
for _, fn := range fns {
names = append(names, fn(cid))
}
}
}
switch mType {
default:
case object.MatchStringNotEqual:
for key := range mBucketNaming {
if key != typeVal {
appendNames(key)
}
}
case object.MatchStringEqual:
appendNames(typeVal)
case object.MatchCommonPrefix:
for key := range mBucketNaming {
if strings.HasPrefix(key, typeVal) {
appendNames(key)
}
}
}
return
}
// selectFromList looks into <fkbt> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromFKBT(
tx *bbolt.Tx,
name []byte, // fkbt root bucket name
f object.SearchFilter, // filter for operation and value
prefix string, // prefix to create addr from oid in index
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
matchFunc, ok := db.matchers[f.Operation()]
if !ok {
db.log.Debug("missing matcher", zap.Uint32("operation", uint32(f.Operation())))
return
}
fkbtRoot := tx.Bucket(name)
if fkbtRoot == nil {
return
}
err := fkbtRoot.ForEach(func(k, _ []byte) error {
if matchFunc(f.Header(), k, f.Value()) {
fkbtLeaf := fkbtRoot.Bucket(k)
if fkbtLeaf == nil {
return nil
}
return fkbtLeaf.ForEach(func(k, _ []byte) error {
addr := prefix + string(k)
markAddressInCache(to, fNum, addr)
return nil
})
}
return nil
})
if err != nil {
db.log.Debug("error in FKBT selection", zap.String("error", err.Error()))
}
}
// selectOutsideFKBT looks into all incl buckets to find list of addresses outside <fkbt> to add in
// resulting cache.
func selectOutsideFKBT(
tx *bbolt.Tx,
incl [][]byte, // buckets
name []byte, // fkbt root bucket name
f object.SearchFilter, // filter for operation and value
prefix string, // prefix to create addr from oid in index
to map[string]int, // resulting cache
fNum int, // index of filter
) {
mExcl := make(map[string]struct{})
bktExcl := tx.Bucket(name)
if bktExcl != nil {
_ = bktExcl.ForEach(func(k, _ []byte) error {
exclBktLeaf := bktExcl.Bucket(k)
if exclBktLeaf == nil {
return nil
}
return exclBktLeaf.ForEach(func(k, _ []byte) error {
addr := prefix + string(k)
mExcl[addr] = struct{}{}
return nil
})
})
}
for i := range incl {
bktIncl := tx.Bucket(incl[i])
if bktIncl == nil {
continue
}
_ = bktIncl.ForEach(func(k, _ []byte) error {
addr := prefix + string(k)
if _, ok := mExcl[addr]; !ok {
markAddressInCache(to, fNum, addr)
}
return nil
})
}
}
// selectFromList looks into <list> index to find list of addresses to add in
// resulting cache.
func (db *DB) selectFromList(
tx *bbolt.Tx,
name []byte, // list root bucket name
f object.SearchFilter, // filter for operation and value
prefix string, // prefix to create addr from oid in index
to map[string]int, // resulting cache
fNum int, // index of filter
) { //
bkt := tx.Bucket(name)
if bkt == nil {
return
}
var (
lst [][]byte
err error
)
switch op := f.Operation(); op {
case object.MatchStringEqual:
lst, err = decodeList(bkt.Get(bucketKeyHelper(f.Header(), f.Value())))
if err != nil {
db.log.Debug("can't decode list bucket leaf", zap.String("error", err.Error()))
return
}
default:
fMatch, ok := db.matchers[op]
if !ok {
db.log.Debug("unknown operation", zap.Uint32("operation", uint32(op)))
return
}
if err = bkt.ForEach(func(key, val []byte) error {
if !fMatch(f.Header(), key, f.Value()) {
return nil
}
l, err := decodeList(val)
if err != nil {
db.log.Debug("can't decode list bucket leaf",
zap.String("error", err.Error()),
)
return err
}
lst = append(lst, l...)
return nil
}); err != nil {
db.log.Debug("can't iterate over the bucket",
zap.String("error", err.Error()),
)
return
}
}
for i := range lst {
addr := prefix + string(lst[i])
markAddressInCache(to, fNum, addr)
}
}
// selectObjectID processes objectID filter with in-place optimizations.
func (db *DB) selectObjectID(
tx *bbolt.Tx,
f object.SearchFilter,
cid *cid.ID,
to map[string]int, // resulting cache
fNum int, // index of filter
) {
prefix := cid.String() + "/"
appendOID := func(oid string) {
addrStr := prefix + string(oid)
addr, err := addressFromKey([]byte(addrStr))
if err != nil {
db.log.Debug("can't decode object id address",
zap.String("addr", addrStr),
zap.String("error", err.Error()))
return
}
ok, err := db.exists(tx, addr)
if (err == nil && ok) || errors.As(err, &splitInfoError) {
markAddressInCache(to, fNum, addrStr)
}
}
switch op := f.Operation(); op {
case object.MatchStringEqual:
appendOID(f.Value())
default:
fMatch, ok := db.matchers[op]
if !ok {
db.log.Debug("unknown operation",
zap.Uint32("operation", uint32(f.Operation())),
)
return
}
for _, bucketName := range bucketNamesForType(cid, object.MatchStringNotEqual, "") {
// copy-paste from DB.selectAllFrom
bkt := tx.Bucket(bucketName)
if bkt == nil {
return
}
err := bkt.ForEach(func(k, v []byte) error {
if oid := string(k); fMatch(f.Header(), k, f.Value()) {
appendOID(oid)
}
return nil
})
if err != nil {
db.log.Debug("could not iterate over the buckets",
zap.String("error", err.Error()),
)
}
}
}
}
// matchSlowFilters return true if object header is matched by all slow filters.
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr *object.Address, f object.SearchFilters) bool {
if len(f) == 0 {
return true
}
obj, err := db.get(tx, addr, true, false)
if err != nil {
return false
}
for i := range f {
matchFunc, ok := db.matchers[f[i].Operation()]
if !ok {
return false
}
var data []byte
switch f[i].Header() {
case v2object.FilterHeaderVersion:
data = []byte(obj.Version().String())
case v2object.FilterHeaderHomomorphicHash:
data = obj.PayloadHomomorphicHash().Sum()
case v2object.FilterHeaderCreationEpoch:
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, obj.CreationEpoch())
case v2object.FilterHeaderPayloadLength:
data = make([]byte, 8)
binary.LittleEndian.PutUint64(data, obj.PayloadSize())
default:
continue // ignore unknown search attributes
}
if !matchFunc(f[i].Header(), data, f[i].Value()) {
return false
}
}
return true
}
// groupFilters divides filters in two groups: fast and slow. Fast filters
// processed by indexes and slow filters processed after by unmarshaling
// object headers.
func groupFilters(filters object.SearchFilters) (*filterGroup, error) {
res := &filterGroup{
fastFilters: make(object.SearchFilters, 0, len(filters)),
slowFilters: make(object.SearchFilters, 0, len(filters)),
}
for i := range filters {
switch filters[i].Header() {
case v2object.FilterHeaderContainerID: // support deprecated field
res.cid = cid.New()
err := res.cid.Parse(filters[i].Value())
if err != nil {
return nil, fmt.Errorf("can't parse container id: %w", err)
}
case // slow filters
v2object.FilterHeaderVersion,
v2object.FilterHeaderCreationEpoch,
v2object.FilterHeaderPayloadLength,
v2object.FilterHeaderHomomorphicHash:
res.slowFilters = append(res.slowFilters, filters[i])
default: // fast filters or user attributes if unknown
res.fastFilters = append(res.fastFilters, filters[i])
}
}
return res, nil
}
func markAddressInCache(cache map[string]int, fNum int, addr string) {
if num := cache[addr]; num == fNum {
cache[addr] = num + 1
}
}
// returns true if query leads to a deliberately empty result.
func blindlyProcess(fs object.SearchFilters) bool {
for i := range fs {
if fs[i].Operation() == object.MatchNotPresent && isSystemKey(fs[i].Header()) {
return true
}
// TODO: check other cases
// e.g. (a == b) && (a != b)
}
return false
}
// returns true if string key is a reserved system filter key.
func isSystemKey(key string) bool {
// FIXME: version-dependent approach
return strings.HasPrefix(key, v2object.ReservedFilterPrefix)
}