WIP: metabase: Find attribute without full unmarshal

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2025-03-19 13:42:12 +03:00
parent e8801dbf49
commit eb8b0a6999
Signed by: fyrchik
SSH key fingerprint: SHA256:m/TTwCzjnRkXgnzEx9X92ccxy1CcVeinOgDb3NPWWmg
4 changed files with 277 additions and 33 deletions

View file

@ -2,7 +2,6 @@ package meta
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
@ -88,10 +87,16 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
}
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
return db.getWithCache(nil, tx, addr, key, checkStatus, raw, currEpoch)
data, err := db.getWithCacheRaw(nil, tx, addr, key, checkStatus, raw, currEpoch)
if err != nil {
return nil, err
}
func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
obj := objectSDK.New()
return obj, obj.Unmarshal(data)
}
func (db *DB) getWithCacheRaw(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) ([]byte, error) {
if checkStatus {
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
if err != nil {
@ -109,13 +114,12 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
key = objectKey(addr.Object(), key)
cnr := addr.Container()
obj := objectSDK.New()
bucketName := make([]byte, bucketKeySize)
// check in primary index
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
if data := b.Get(key); len(data) != 0 {
return obj, obj.Unmarshal(data)
return data, nil
}
}
@ -127,17 +131,17 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
// if not found then check in tombstone index
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
if len(data) != 0 {
return obj, obj.Unmarshal(data)
return data, nil
}
// if not found then check in locker index
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
if len(data) != 0 {
return obj, obj.Unmarshal(data)
return data, nil
}
// if not found then check if object is a virtual
return getVirtualObject(tx, cnr, key, raw)
return getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
}
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
@ -150,6 +154,16 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
}
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
data, err := getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
if err != nil {
return nil, err
}
obj := objectSDK.New()
return obj, obj.Unmarshal(data)
}
func getVirtualObjectNoUnmarshal(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) ([]byte, error) {
if raw {
return nil, getSplitInfoError(tx, cnr, key)
}
@ -187,20 +201,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
}
}
child := objectSDK.New()
err = child.Unmarshal(data)
if err != nil {
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
}
par := child.Parent()
if par == nil { // this should never happen though
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return par, nil
return parentFromChild(data)
}
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {

View file

@ -458,11 +458,22 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
return result, true
}
obj, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
src, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
if err != nil {
return result, false
}
var obj *objectSDK.Object
for i := range f {
if strings.HasPrefix(f[i].Header(), v2object.ReservedFilterPrefix) {
obj = objectSDK.New()
if err := obj.Unmarshal(src); err != nil {
return result, false
}
break
}
}
for i := range f {
var data []byte
switch f[i].Header() {
@ -492,12 +503,15 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
cs, _ := obj.PayloadChecksum()
data = cs.Value()
default: // user attribute
v, ok := attributeValue(obj, f[i].Header())
if ok {
if ech := obj.ECHeader(); ech != nil {
result.SetObject(ech.Parent())
res, err := attributeValueRaw(src, f[i].Header())
if err != nil {
return result, false
}
data = []byte(v)
if res.attribute.found {
if res.isEC {
result.SetObject(res.parentID)
}
data = []byte(res.attribute.value)
} else {
return result, f[i].Operation() == objectSDK.MatchNotPresent
}
@ -516,9 +530,9 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
return result, true
}
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) ([]byte, bool, error) {
buf := make([]byte, addressKeySize)
obj, err := db.getWithCache(bc, tx, addr, buf, false, false, currEpoch)
data, err := db.getWithCacheRaw(bc, tx, addr, buf, false, false, currEpoch)
if err != nil {
var ecInfoError *objectSDK.ECInfoError
if errors.As(err, &ecInfoError) {
@ -528,15 +542,15 @@ func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Ad
continue
}
addr.SetObject(objID)
obj, err = db.getWithCache(bc, tx, addr, buf, true, false, currEpoch)
data, err = db.getWithCacheRaw(bc, tx, addr, buf, true, false, currEpoch)
if err == nil {
return obj, true, nil
return data, true, nil
}
}
}
return nil, false, err
}
return obj, false, nil
return data, false, nil
}
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {

View file

@ -0,0 +1,191 @@
package meta
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/VictoriaMetrics/easyproto"
)
var errMalformedObject = errors.New("malformed object")
type protoAttribute struct {
found bool
value string
}
type protoHeader struct {
attribute protoAttribute
ecHeader protoECHeader
}
type protoECHeader struct {
parentID protoOID
attribute protoAttribute
}
type protoOID oid.ID
type protoIter struct {
fc easyproto.FieldContext
data []byte
failed bool
eof bool
}
func newProtoIter(data []byte, ok bool) protoIter {
return protoIter{data: data, failed: !ok}
}
func (p *protoIter) err() error {
if p.failed {
return errMalformedObject
}
return nil
}
func (p *protoIter) finished() bool {
return p.failed || p.eof
}
func (p *protoIter) next() {
if p.failed {
return
}
if len(p.data) == 0 {
p.eof = true
return
}
data, err := p.fc.NextField(p.data)
if err != nil {
p.failed = true
return
}
p.data = data
}
func (p *protoOID) fill(fc easyproto.FieldContext) error {
iter := newProtoIter(fc.MessageData())
for iter.next(); !iter.finished(); iter.next() {
if iter.fc.FieldNum == 1 { // Wire number for `id` field.
rawID, ok := iter.fc.Bytes()
if !ok {
return errMalformedObject
}
return (*oid.ID)(p).Decode(rawID)
}
}
return iter.err()
}
func (p *protoAttribute) fill(fc easyproto.FieldContext, attribute string) error {
var key, value string
iter := newProtoIter(fc.MessageData())
for iter.next(); !iter.finished(); iter.next() {
var ok bool
switch iter.fc.FieldNum {
case 1: // Wire number for `key` field.
key, ok = iter.fc.String()
case 2: // Wire number for `value` field.
value, ok = iter.fc.String()
default:
continue
}
if !ok {
return errMalformedObject
}
}
if key == attribute {
p.found = true
p.value = value
}
return iter.err()
}
func (p *protoECHeader) fill(fc easyproto.FieldContext, attribute string) error {
iter := newProtoIter(fc.MessageData())
for iter.next(); !iter.finished(); iter.next() {
var err error
switch iter.fc.FieldNum {
case 1: // Wire number for `parent` field.
err = p.parentID.fill(iter.fc)
case 8: // Wire number for `parent_attributes` field.
err = p.attribute.fill(iter.fc, attribute)
}
if err != nil {
return err
}
}
return iter.err()
}
func (p *protoHeader) fill(fc easyproto.FieldContext, attribute string) error {
iter := newProtoIter(fc.MessageData())
for iter.next(); !iter.finished(); iter.next() {
var err error
switch iter.fc.FieldNum {
case 10: // Wire number for `attributes` field.
err = p.attribute.fill(iter.fc, attribute)
case 12: // Wire number for `ec` field.
err = p.ecHeader.fill(iter.fc, attribute)
}
if err != nil {
return err
}
}
return iter.err()
}
type attributeMatchResult struct {
attribute protoAttribute
isEC bool
parentID oid.ID
}
func attributeValueRaw(src []byte, attribute string) (attributeMatchResult, error) {
iter := newProtoIter(src, true)
for iter.next(); !iter.finished(); iter.next() {
if iter.fc.FieldNum == 3 { // Wire number for `header` field.
var p protoHeader
if err := p.fill(iter.fc, attribute); err != nil {
return attributeMatchResult{}, err
}
if p.ecHeader.attribute.found {
return attributeMatchResult{
attribute: p.ecHeader.attribute,
isEC: true,
parentID: oid.ID(p.ecHeader.parentID),
}, nil
} else {
return attributeMatchResult{attribute: p.attribute}, nil
}
}
}
return attributeMatchResult{}, iter.err()
}
func parentFromChild(src []byte) ([]byte, error) {
child := objectSDK.New()
err := child.Unmarshal(src)
if err != nil {
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
}
par := child.Parent()
if par == nil { // this should never happen though
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return par.Marshal()
}

View file

@ -0,0 +1,38 @@
package meta
import (
"strconv"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestAttributeValueRaw_ZeroAlloc(t *testing.T) {
const attrCount = 4
attrs := make([]objectSDK.Attribute, attrCount)
for i := range attrs {
attrs[i].SetKey(strconv.Itoa(i))
attrs[i].SetValue(strconv.Itoa(i * 1000))
}
cnr := cidtest.ID()
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: oidtest.ID(), Attributes: attrs[:attrCount/2]}, 0, 2, nil, 0)
obj := testutil.GenerateObjectWithCID(cnr)
obj.SetECHeader(ech)
obj.SetAttributes(attrs[attrCount/2:]...)
raw, err := obj.Marshal()
require.NoError(t, err)
require.Zero(t, testing.AllocsPerRun(100, func() {
res, err := attributeValueRaw(raw, "0")
if err != nil || !res.attribute.found {
t.Fatalf("err %v, found %t", err, res.attribute.found)
}
}))
}