forked from TrueCloudLab/frostfs-node
[#1482] metabase: Encode database keys in binary
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
d6fef68a62
commit
ae1dab29bc
19 changed files with 392 additions and 276 deletions
|
@ -107,13 +107,16 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
|||
continue // ignore objects with unmatched fast filters
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
||||
err = decodeAddressFromKey(&addr, []byte(a))
|
||||
var id oid.ID
|
||||
err = id.Decode([]byte(a))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(id)
|
||||
|
||||
if objectStatus(tx, addr, currEpoch) > 0 {
|
||||
continue // ignore removed objects
|
||||
}
|
||||
|
@ -130,26 +133,24 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, c
|
|||
|
||||
// selectAll adds to resulting cache all available objects in metabase.
|
||||
func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) {
|
||||
prefix := cnr.EncodeToString() + "/"
|
||||
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr), prefix, to, 0)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr), prefix, to, 0)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr), prefix, to, 0)
|
||||
selectAllFromBucket(tx, parentBucketName(cnr), prefix, to, 0)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr), prefix, to, 0)
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0)
|
||||
}
|
||||
|
||||
// selectAllFromBucket goes through all keys in bucket and adds them in a
|
||||
// resulting cache. Keys should be stringed object ids.
|
||||
func selectAllFromBucket(tx *bbolt.Tx, name []byte, prefix string, to map[string]int, fNum int) {
|
||||
func selectAllFromBucket(tx *bbolt.Tx, name []byte, to map[string]int, fNum int) {
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
_ = bkt.ForEach(func(k, v []byte) error {
|
||||
key := prefix + string(k) // consider using string builders from sync.Pool
|
||||
markAddressInCache(to, fNum, key)
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -164,47 +165,46 @@ func (db *DB) selectFastFilter(
|
|||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
prefix := cnr.EncodeToString() + "/"
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
switch f.Header() {
|
||||
case v2object.FilterHeaderObjectID:
|
||||
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
||||
case v2object.FilterHeaderOwnerID:
|
||||
bucketName := ownerBucketName(cnr)
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
bucketName := ownerBucketName(cnr, bucketName)
|
||||
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterHeaderPayloadHash:
|
||||
bucketName := payloadHashBucketName(cnr)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
bucketName := payloadHashBucketName(cnr, bucketName)
|
||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterHeaderObjectType:
|
||||
for _, bucketName := range bucketNamesForType(cnr, f.Operation(), f.Value()) {
|
||||
selectAllFromBucket(tx, bucketName, prefix, to, fNum)
|
||||
selectAllFromBucket(tx, bucketName, to, fNum)
|
||||
}
|
||||
case v2object.FilterHeaderParent:
|
||||
bucketName := parentBucketName(cnr)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
bucketName := parentBucketName(cnr, bucketName)
|
||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterHeaderSplitID:
|
||||
bucketName := splitBucketName(cnr)
|
||||
db.selectFromList(tx, bucketName, f, prefix, to, fNum)
|
||||
bucketName := splitBucketName(cnr, bucketName)
|
||||
db.selectFromList(tx, bucketName, f, to, fNum)
|
||||
case v2object.FilterPropertyRoot:
|
||||
selectAllFromBucket(tx, rootBucketName(cnr), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, rootBucketName(cnr, bucketName), to, fNum)
|
||||
case v2object.FilterPropertyPhy:
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr), prefix, to, fNum)
|
||||
selectAllFromBucket(tx, primaryBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, fNum)
|
||||
selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum)
|
||||
default: // user attribute
|
||||
bucketName := attributeBucketName(cnr, f.Header())
|
||||
bucketName := attributeBucketName(cnr, f.Header(), bucketName)
|
||||
|
||||
if f.Operation() == object.MatchNotPresent {
|
||||
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, f, prefix, to, fNum)
|
||||
selectOutsideFKBT(tx, allBucketNames(cnr), bucketName, to, fNum)
|
||||
} else {
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
db.selectFromFKBT(tx, bucketName, f, to, fNum)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var mBucketNaming = map[string][]func(cid.ID) []byte{
|
||||
var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{
|
||||
v2object.TypeRegular.String(): {primaryBucketName, parentBucketName},
|
||||
v2object.TypeTombstone.String(): {tombstoneBucketName},
|
||||
v2object.TypeStorageGroup.String(): {storageGroupBucketName},
|
||||
|
@ -214,7 +214,7 @@ var mBucketNaming = map[string][]func(cid.ID) []byte{
|
|||
func allBucketNames(cnr cid.ID) (names [][]byte) {
|
||||
for _, fns := range mBucketNaming {
|
||||
for _, fn := range fns {
|
||||
names = append(names, fn(cnr))
|
||||
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ func bucketNamesForType(cnr cid.ID, mType object.SearchMatchType, typeVal string
|
|||
fns, ok := mBucketNaming[key]
|
||||
if ok {
|
||||
for _, fn := range fns {
|
||||
names = append(names, fn(cnr))
|
||||
names = append(names, fn(cnr, make([]byte, bucketKeySize)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -258,7 +258,6 @@ func (db *DB) selectFromFKBT(
|
|||
tx *bbolt.Tx,
|
||||
name []byte, // fkbt root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) { //
|
||||
|
@ -281,8 +280,7 @@ func (db *DB) selectFromFKBT(
|
|||
}
|
||||
|
||||
return fkbtLeaf.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
markAddressInCache(to, fNum, addr)
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -298,8 +296,6 @@ func selectOutsideFKBT(
|
|||
tx *bbolt.Tx,
|
||||
incl [][]byte, // buckets
|
||||
name []byte, // fkbt root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) {
|
||||
|
@ -314,8 +310,7 @@ func selectOutsideFKBT(
|
|||
}
|
||||
|
||||
return exclBktLeaf.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
mExcl[addr] = struct{}{}
|
||||
mExcl[string(k)] = struct{}{}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -329,10 +324,8 @@ func selectOutsideFKBT(
|
|||
}
|
||||
|
||||
_ = bktIncl.ForEach(func(k, _ []byte) error {
|
||||
addr := prefix + string(k)
|
||||
|
||||
if _, ok := mExcl[addr]; !ok {
|
||||
markAddressInCache(to, fNum, addr)
|
||||
if _, ok := mExcl[string(k)]; !ok {
|
||||
markAddressInCache(to, fNum, string(k))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -346,7 +339,6 @@ func (db *DB) selectFromList(
|
|||
tx *bbolt.Tx,
|
||||
name []byte, // list root bucket name
|
||||
f object.SearchFilter, // filter for operation and value
|
||||
prefix string, // prefix to create addr from oid in index
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
) { //
|
||||
|
@ -398,8 +390,7 @@ func (db *DB) selectFromList(
|
|||
}
|
||||
|
||||
for i := range lst {
|
||||
addr := prefix + string(lst[i])
|
||||
markAddressInCache(to, fNum, addr)
|
||||
markAddressInCache(to, fNum, string(lst[i]))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -412,30 +403,25 @@ func (db *DB) selectObjectID(
|
|||
fNum int, // index of filter
|
||||
currEpoch uint64,
|
||||
) {
|
||||
prefix := cnr.EncodeToString() + "/"
|
||||
|
||||
appendOID := func(strObj string) {
|
||||
addrStr := prefix + strObj
|
||||
appendOID := func(id oid.ID) {
|
||||
var addr oid.Address
|
||||
|
||||
err := decodeAddressFromKey(&addr, []byte(addrStr))
|
||||
if err != nil {
|
||||
db.log.Debug("can't decode object id address",
|
||||
zap.String("addr", addrStr),
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
}
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(id)
|
||||
|
||||
ok, err := db.exists(tx, addr, currEpoch)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
markAddressInCache(to, fNum, addrStr)
|
||||
raw := make([]byte, objectKeySize)
|
||||
id.Encode(raw)
|
||||
markAddressInCache(to, fNum, string(raw))
|
||||
}
|
||||
}
|
||||
|
||||
switch op := f.Operation(); op {
|
||||
case object.MatchStringEqual:
|
||||
appendOID(f.Value())
|
||||
var id oid.ID
|
||||
if err := id.DecodeString(f.Value()); err == nil {
|
||||
appendOID(id)
|
||||
}
|
||||
default:
|
||||
fMatch, ok := db.matchers[op]
|
||||
if !ok {
|
||||
|
@ -454,7 +440,10 @@ func (db *DB) selectObjectID(
|
|||
}
|
||||
|
||||
err := fMatch.matchBucket(bkt, f.Header(), f.Value(), func(k, v []byte) error {
|
||||
appendOID(string(k))
|
||||
var id oid.ID
|
||||
if err := id.Decode(k); err == nil {
|
||||
appendOID(id)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -472,7 +461,8 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f object.SearchFi
|
|||
return true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, addr, true, false, currEpoch)
|
||||
buf := make([]byte, addressKeySize)
|
||||
obj, err := db.get(tx, addr, buf, true, false, currEpoch)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue