WIP: metabase: Find attribute without full unmarshal
Some checks failed
DCO action / DCO (pull_request) Failing after 30s
Tests and linters / Staticcheck (pull_request) Failing after 1m16s
Vulncheck / Vulncheck (pull_request) Successful in 1m12s
Tests and linters / Lint (pull_request) Failing after 1m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Build / Build Components (pull_request) Successful in 1m41s
Tests and linters / Run gofumpt (pull_request) Successful in 3m38s
Tests and linters / Tests (pull_request) Successful in 4m25s
Tests and linters / gopls check (pull_request) Successful in 5m55s
Tests and linters / Tests with -race (pull_request) Successful in 6m9s
Some checks failed
DCO action / DCO (pull_request) Failing after 30s
Tests and linters / Staticcheck (pull_request) Failing after 1m16s
Vulncheck / Vulncheck (pull_request) Successful in 1m12s
Tests and linters / Lint (pull_request) Failing after 1m26s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Build / Build Components (pull_request) Successful in 1m41s
Tests and linters / Run gofumpt (pull_request) Successful in 3m38s
Tests and linters / Tests (pull_request) Successful in 4m25s
Tests and linters / gopls check (pull_request) Successful in 5m55s
Tests and linters / Tests with -race (pull_request) Successful in 6m9s
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
ec2871dd64
commit
aac2449d49
4 changed files with 252 additions and 17 deletions
|
@ -87,7 +87,7 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
func (db *DB) getRaw(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) ([]byte, error) {
|
||||||
if checkStatus {
|
if checkStatus {
|
||||||
st, err := objectStatus(tx, addr, currEpoch)
|
st, err := objectStatus(tx, addr, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -105,13 +105,12 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
||||||
|
|
||||||
key = objectKey(addr.Object(), key)
|
key = objectKey(addr.Object(), key)
|
||||||
cnr := addr.Container()
|
cnr := addr.Container()
|
||||||
obj := objectSDK.New()
|
|
||||||
bucketName := make([]byte, bucketKeySize)
|
bucketName := make([]byte, bucketKeySize)
|
||||||
|
|
||||||
// check in primary index
|
// check in primary index
|
||||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
|
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(data)
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), key)
|
||||||
|
@ -122,17 +121,31 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b
|
||||||
// if not found then check in tombstone index
|
// if not found then check in tombstone index
|
||||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(data)
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check in locker index
|
// if not found then check in locker index
|
||||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
||||||
if len(data) != 0 {
|
if len(data) != 0 {
|
||||||
return obj, obj.Unmarshal(data)
|
return data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if not found then check if object is a virtual
|
// if not found then check if object is a virtual
|
||||||
return getVirtualObject(tx, cnr, key, raw)
|
par, err := getVirtualObject(tx, cnr, key, raw)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return par.Marshal()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||||
|
data, err := db.getRaw(tx, addr, key, checkStatus, raw, currEpoch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := objectSDK.New()
|
||||||
|
return obj, obj.Unmarshal(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||||
|
|
|
@ -458,11 +458,22 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
return result, true
|
return result, true
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch)
|
src, isECChunk, err := db.getObjectForSlowFilters(tx, addr, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, false
|
return result, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var obj *objectSDK.Object
|
||||||
|
for i := range f {
|
||||||
|
if strings.HasPrefix(f[i].Header(), v2object.ReservedFilterPrefix) {
|
||||||
|
obj = objectSDK.New()
|
||||||
|
if err := obj.Unmarshal(src); err != nil {
|
||||||
|
return result, false
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i := range f {
|
for i := range f {
|
||||||
var data []byte
|
var data []byte
|
||||||
switch f[i].Header() {
|
switch f[i].Header() {
|
||||||
|
@ -492,12 +503,15 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
cs, _ := obj.PayloadChecksum()
|
cs, _ := obj.PayloadChecksum()
|
||||||
data = cs.Value()
|
data = cs.Value()
|
||||||
default: // user attribute
|
default: // user attribute
|
||||||
v, ok := attributeValue(obj, f[i].Header())
|
res, err := AttributeValueRaw(src, f[i].Header())
|
||||||
if ok {
|
if err != nil {
|
||||||
if ech := obj.ECHeader(); ech != nil {
|
return result, false
|
||||||
result.SetObject(ech.Parent())
|
|
||||||
}
|
}
|
||||||
data = []byte(v)
|
if res.attribute.found {
|
||||||
|
if res.isEC {
|
||||||
|
result.SetObject(res.parentID)
|
||||||
|
}
|
||||||
|
data = []byte(res.attribute.value)
|
||||||
} else {
|
} else {
|
||||||
return result, f[i].Operation() == objectSDK.MatchNotPresent
|
return result, f[i].Operation() == objectSDK.MatchNotPresent
|
||||||
}
|
}
|
||||||
|
@ -516,9 +530,9 @@ func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f objectSDK.Searc
|
||||||
return result, true
|
return result, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
|
func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) ([]byte, bool, error) {
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
obj, err := db.get(tx, addr, buf, false, false, currEpoch)
|
data, err := db.getRaw(tx, addr, buf, false, false, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var ecInfoError *objectSDK.ECInfoError
|
var ecInfoError *objectSDK.ECInfoError
|
||||||
if errors.As(err, &ecInfoError) {
|
if errors.As(err, &ecInfoError) {
|
||||||
|
@ -528,15 +542,15 @@ func (db *DB) getObjectForSlowFilters(tx *bbolt.Tx, addr oid.Address, currEpoch
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
obj, err = db.get(tx, addr, buf, true, false, currEpoch)
|
data, err = db.getRaw(tx, addr, buf, true, false, currEpoch)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return obj, true, nil
|
return data, true, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
return obj, false, nil
|
return data, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||||
|
|
170
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
170
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
|
@ -0,0 +1,170 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/VictoriaMetrics/easyproto"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errMalformedObject = errors.New("malformed object")
|
||||||
|
|
||||||
|
type protoAttribute struct {
|
||||||
|
found bool
|
||||||
|
value string
|
||||||
|
}
|
||||||
|
|
||||||
|
type protoHeader struct {
|
||||||
|
attribute protoAttribute
|
||||||
|
ecHeader protoECHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
type protoECHeader struct {
|
||||||
|
parentID protoOID
|
||||||
|
attribute protoAttribute
|
||||||
|
}
|
||||||
|
|
||||||
|
type protoOID oid.ID
|
||||||
|
|
||||||
|
type protoIter struct {
|
||||||
|
fc easyproto.FieldContext
|
||||||
|
data []byte
|
||||||
|
failed bool
|
||||||
|
eof bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProtoIter(data []byte, ok bool) protoIter {
|
||||||
|
return protoIter{data: data, failed: !ok}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoIter) err() error {
|
||||||
|
if p.failed {
|
||||||
|
return errMalformedObject
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoIter) finished() bool {
|
||||||
|
return p.failed || p.eof
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoIter) next() {
|
||||||
|
if p.failed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(p.data) == 0 {
|
||||||
|
p.eof = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := p.fc.NextField(p.data)
|
||||||
|
if err != nil {
|
||||||
|
p.failed = true
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.data = data
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoOID) fill(fc easyproto.FieldContext) error {
|
||||||
|
iter := newProtoIter(fc.MessageData())
|
||||||
|
for iter.next(); !iter.finished(); iter.next() {
|
||||||
|
if iter.fc.FieldNum == 1 { // Wire number for `id` field.
|
||||||
|
rawID, ok := iter.fc.Bytes()
|
||||||
|
if !ok {
|
||||||
|
return errMalformedObject
|
||||||
|
}
|
||||||
|
return (*oid.ID)(p).Decode(rawID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return iter.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoAttribute) fill(fc easyproto.FieldContext, attribute string) error {
|
||||||
|
var key, value string
|
||||||
|
|
||||||
|
iter := newProtoIter(fc.MessageData())
|
||||||
|
for iter.next(); !iter.finished(); iter.next() {
|
||||||
|
var ok bool
|
||||||
|
switch iter.fc.FieldNum {
|
||||||
|
case 1: // Wire number for `key` field.
|
||||||
|
key, ok = iter.fc.String()
|
||||||
|
case 2: // Wire number for `value` field.
|
||||||
|
value, ok = iter.fc.String()
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return errMalformedObject
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if key == attribute {
|
||||||
|
p.found = true
|
||||||
|
p.value = value
|
||||||
|
}
|
||||||
|
return iter.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoECHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||||
|
iter := newProtoIter(fc.MessageData())
|
||||||
|
for iter.next(); !iter.finished(); iter.next() {
|
||||||
|
var err error
|
||||||
|
switch iter.fc.FieldNum {
|
||||||
|
case 1: // Wire number for `parent` field.
|
||||||
|
err = p.parentID.fill(iter.fc)
|
||||||
|
case 8: // Wire number for `parent_attributes` field.
|
||||||
|
err = p.attribute.fill(iter.fc, attribute)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return iter.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *protoHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||||
|
iter := newProtoIter(fc.MessageData())
|
||||||
|
for iter.next(); !iter.finished(); iter.next() {
|
||||||
|
var err error
|
||||||
|
switch iter.fc.FieldNum {
|
||||||
|
case 10: // Wire number for `attributes` field.
|
||||||
|
err = p.attribute.fill(iter.fc, attribute)
|
||||||
|
case 12: // Wire number for `ec` field.
|
||||||
|
err = p.ecHeader.fill(iter.fc, attribute)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return iter.err()
|
||||||
|
}
|
||||||
|
|
||||||
|
type attributeMatchResult struct {
|
||||||
|
attribute protoAttribute
|
||||||
|
|
||||||
|
isEC bool
|
||||||
|
parentID oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func AttributeValueRaw(src []byte, attribute string) (attributeMatchResult, error) {
|
||||||
|
iter := newProtoIter(src, true)
|
||||||
|
for iter.next(); !iter.finished(); iter.next() {
|
||||||
|
if iter.fc.FieldNum == 3 { // Wire number for `header` field.
|
||||||
|
var p protoHeader
|
||||||
|
if err := p.fill(iter.fc, attribute); err != nil {
|
||||||
|
return attributeMatchResult{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.ecHeader.attribute.found {
|
||||||
|
return attributeMatchResult{
|
||||||
|
attribute: p.ecHeader.attribute,
|
||||||
|
isEC: true,
|
||||||
|
parentID: oid.ID(p.ecHeader.parentID),
|
||||||
|
}, nil
|
||||||
|
} else {
|
||||||
|
return attributeMatchResult{attribute: p.attribute}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return attributeMatchResult{}, iter.err()
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package meta
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAttributeValueRaw_ZeroAlloc(t *testing.T) {
|
||||||
|
const attrCount = 4
|
||||||
|
attrs := make([]objectSDK.Attribute, attrCount)
|
||||||
|
for i := range attrs {
|
||||||
|
attrs[i].SetKey(strconv.Itoa(i))
|
||||||
|
attrs[i].SetValue(strconv.Itoa(i * 1000))
|
||||||
|
}
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: oidtest.ID(), Attributes: attrs[:attrCount/2]}, 0, 2, nil, 0)
|
||||||
|
|
||||||
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
obj.SetECHeader(ech)
|
||||||
|
obj.SetAttributes(attrs[attrCount/2:]...)
|
||||||
|
|
||||||
|
raw, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Zero(t, testing.AllocsPerRun(100, func() {
|
||||||
|
res, err := AttributeValueRaw(raw, "0")
|
||||||
|
if err != nil || !res.attribute.found {
|
||||||
|
t.Fatalf("err %v, found %t", err, res.attribute.found)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue