From aac2449d495a33c28ae39b90379c44acfd54094a Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Wed, 19 Mar 2025 13:42:12 +0300 Subject: [PATCH] WIP: metabase: Find attribute without full unmarshal Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/metabase/get.go | 25 ++- pkg/local_object_storage/metabase/select.go | 36 ++-- .../metabase/select_proto_access.go | 170 ++++++++++++++++++ .../metabase/select_proto_access_test.go | 38 ++++ 4 files changed, 252 insertions(+), 17 deletions(-) create mode 100644 pkg/local_object_storage/metabase/select_proto_access.go create mode 100644 pkg/local_object_storage/metabase/select_proto_access_test.go diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 615add1af..fda13e6f6 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -87,7 +87,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) { return res, metaerr.Wrap(err) } -func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { +func (db *DB) getRaw(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) ([]byte, error) { if checkStatus { st, err := objectStatus(tx, addr, currEpoch) if err != nil { @@ -105,13 +105,12 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b key = objectKey(addr.Object(), key) cnr := addr.Container() - obj := objectSDK.New() bucketName := make([]byte, bucketKeySize) // check in primary index data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(data) + return data, nil } data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key) @@ -122,17 +121,31 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b // if not found then check in tombstone index data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(data) + return data, nil } // if not found then check in locker index data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key) if len(data) != 0 { - return obj, obj.Unmarshal(data) + return data, nil } // if not found then check if object is a virtual - return getVirtualObject(tx, cnr, key, raw) + par, err := getVirtualObject(tx, cnr, key, raw) + if err != nil { + return nil, err + } + return par.Marshal() +} + +func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { + data, err := db.getRaw(tx, addr, key, checkStatus, raw, currEpoch) + if err != nil { + return nil, err + } + + obj := objectSDK.New() + return obj, obj.Unmarshal(data) } func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte { diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index a95384753..dd98f5c7c 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -458,11 +458,22 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc return result, true } - obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch) + src, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch) if err != nil { return result, false } + var obj *objectSDK.Object + for i := range f { + if strings.HasPrefix(f[i].Header(), v2object.ReservedFilterPrefix) { + obj = objectSDK.New() + if err := obj.Unmarshal(src); err != nil { + return result, false + } + break + } + } + for i := range f { var data []byte switch f[i].Header() { @@ -492,12 +503,15 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc cs, _ := obj.PayloadChecksum() data = cs.Value() default: // user attribute - v, ok := attributeValue(obj, f[i].Header()) - if ok { - if ech := obj.ECHeader(); ech != nil { - result.SetObject(ech.Parent()) + res, err := AttributeValueRaw(src, f[i].Header()) + if err != nil { + return result, false + } + if res.attribute.found { + if res.isEC { + result.SetObject(res.parentID) } - data = []byte(v) + data = []byte(res.attribute.value) } else { return result, f[i].Operation() == objectSDK.MatchNotPresent } @@ -516,9 +530,9 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc return result, true } -func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) { +func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) ([]byte, bool, error) { buf := make([]byte, addressKeySize) - obj, err := db.get(tx, addr, buf, false, false, currEpoch) + data, err := db.getRaw(tx, addr, buf, false, false, currEpoch) if err != nil { var ecInfoError *objectSDK.ECInfoError if errors.As(err, &ecInfoError) { @@ -528,15 +542,15 @@ func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch continue } addr.SetObject(objID) - obj, err = db.get(tx, addr, buf, true, false, currEpoch) + data, err = db.getRaw(tx, addr, buf, true, false, currEpoch) if err == nil { - return obj, true, nil + return data, true, nil } } } return nil, false, err } - return obj, false, nil + return data, false, nil } func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) { diff --git a/pkg/local_object_storage/metabase/select_proto_access.go b/pkg/local_object_storage/metabase/select_proto_access.go new file mode 100644 index 000000000..b6481177a --- /dev/null +++ b/pkg/local_object_storage/metabase/select_proto_access.go @@ -0,0 +1,170 @@ +package meta + +import ( + "errors" + + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/VictoriaMetrics/easyproto" +) + +var errMalformedObject = errors.New("malformed object") + +type protoAttribute struct { + found bool + value string +} + +type protoHeader struct { + attribute protoAttribute + ecHeader protoECHeader +} + +type protoECHeader struct { + parentID protoOID + attribute protoAttribute +} + +type protoOID oid.ID + +type protoIter struct { + fc easyproto.FieldContext + data []byte + failed bool + eof bool +} + +func newProtoIter(data []byte, ok bool) protoIter { + return protoIter{data: data, failed: !ok} +} + +func (p *protoIter) err() error { + if p.failed { + return errMalformedObject + } + return nil +} + +func (p *protoIter) finished() bool { + return p.failed || p.eof +} + +func (p *protoIter) next() { + if p.failed { + return + } + if len(p.data) == 0 { + p.eof = true + return + } + + data, err := p.fc.NextField(p.data) + if err != nil { + p.failed = true + return + } + + p.data = data +} + +func (p *protoOID) fill(fc easyproto.FieldContext) error { + iter := newProtoIter(fc.MessageData()) + for iter.next(); !iter.finished(); iter.next() { + if iter.fc.FieldNum == 1 { // Wire number for `id` field. + rawID, ok := iter.fc.Bytes() + if !ok { + return errMalformedObject + } + return (*oid.ID)(p).Decode(rawID) + } + } + return iter.err() +} + +func (p *protoAttribute) fill(fc easyproto.FieldContext, attribute string) error { + var key, value string + + iter := newProtoIter(fc.MessageData()) + for iter.next(); !iter.finished(); iter.next() { + var ok bool + switch iter.fc.FieldNum { + case 1: // Wire number for `key` field. + key, ok = iter.fc.String() + case 2: // Wire number for `value` field. + value, ok = iter.fc.String() + default: + continue + } + if !ok { + return errMalformedObject + } + } + if key == attribute { + p.found = true + p.value = value + } + return iter.err() +} + +func (p *protoECHeader) fill(fc easyproto.FieldContext, attribute string) error { + iter := newProtoIter(fc.MessageData()) + for iter.next(); !iter.finished(); iter.next() { + var err error + switch iter.fc.FieldNum { + case 1: // Wire number for `parent` field. + err = p.parentID.fill(iter.fc) + case 8: // Wire number for `parent_attributes` field. + err = p.attribute.fill(iter.fc, attribute) + } + if err != nil { + return err + } + } + return iter.err() +} + +func (p *protoHeader) fill(fc easyproto.FieldContext, attribute string) error { + iter := newProtoIter(fc.MessageData()) + for iter.next(); !iter.finished(); iter.next() { + var err error + switch iter.fc.FieldNum { + case 10: // Wire number for `attributes` field. + err = p.attribute.fill(iter.fc, attribute) + case 12: // Wire number for `ec` field. + err = p.ecHeader.fill(iter.fc, attribute) + } + if err != nil { + return err + } + } + return iter.err() +} + +type attributeMatchResult struct { + attribute protoAttribute + + isEC bool + parentID oid.ID +} + +func AttributeValueRaw(src []byte, attribute string) (attributeMatchResult, error) { + iter := newProtoIter(src, true) + for iter.next(); !iter.finished(); iter.next() { + if iter.fc.FieldNum == 3 { // Wire number for `header` field. + var p protoHeader + if err := p.fill(iter.fc, attribute); err != nil { + return attributeMatchResult{}, err + } + + if p.ecHeader.attribute.found { + return attributeMatchResult{ + attribute: p.ecHeader.attribute, + isEC: true, + parentID: oid.ID(p.ecHeader.parentID), + }, nil + } else { + return attributeMatchResult{attribute: p.attribute}, nil + } + } + } + return attributeMatchResult{}, iter.err() +} diff --git a/pkg/local_object_storage/metabase/select_proto_access_test.go b/pkg/local_object_storage/metabase/select_proto_access_test.go new file mode 100644 index 000000000..11577b8f0 --- /dev/null +++ b/pkg/local_object_storage/metabase/select_proto_access_test.go @@ -0,0 +1,38 @@ +package meta + +import ( + "strconv" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func TestAttributeValueRaw_ZeroAlloc(t *testing.T) { + const attrCount = 4 + attrs := make([]objectSDK.Attribute, attrCount) + for i := range attrs { + attrs[i].SetKey(strconv.Itoa(i)) + attrs[i].SetValue(strconv.Itoa(i * 1000)) + } + + cnr := cidtest.ID() + ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: oidtest.ID(), Attributes: attrs[:attrCount/2]}, 0, 2, nil, 0) + + obj := testutil.GenerateObjectWithCID(cnr) + obj.SetECHeader(ech) + obj.SetAttributes(attrs[attrCount/2:]...) + + raw, err := obj.Marshal() + require.NoError(t, err) + + require.Zero(t, testing.AllocsPerRun(100, func() { + res, err := AttributeValueRaw(raw, "0") + if err != nil || !res.attribute.found { + t.Fatalf("err %v, found %t", err, res.attribute.found) + } + })) +}