WIP: Speedup search, part 3: don't allocate while searching for attributes #1687
4 changed files with 410 additions and 33 deletions
|
@ -2,7 +2,6 @@ package meta
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
|
@ -88,10 +87,16 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
|||
}
|
||||
|
||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||
return db.getWithCache(nil, tx, addr, key, checkStatus, raw, currEpoch)
|
||||
data, err := db.getWithCacheRaw(nil, tx, addr, key, checkStatus, raw, currEpoch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||
func (db *DB) getWithCacheRaw(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) ([]byte, error) {
|
||||
if checkStatus {
|
||||
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
|
@ -109,13 +114,12 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
|
||||
key = objectKey(addr.Object(), key)
|
||||
cnr := addr.Container()
|
||||
obj := objectSDK.New()
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
// check in primary index
|
||||
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
|
||||
if data := b.Get(key); len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,17 +131,17 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// if not found then check in locker index
|
||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// if not found then check if object is a virtual
|
||||
return getVirtualObject(tx, cnr, key, raw)
|
||||
return getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
|
||||
}
|
||||
|
||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||
|
@ -150,6 +154,16 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
|||
}
|
||||
|
||||
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||
data, err := getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
func getVirtualObjectNoUnmarshal(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) ([]byte, error) {
|
||||
if raw {
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
}
|
||||
|
@ -187,20 +201,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
}
|
||||
}
|
||||
|
||||
child := objectSDK.New()
|
||||
|
||||
err = child.Unmarshal(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||
}
|
||||
|
||||
par := child.Parent()
|
||||
|
||||
if par == nil { // this should never happen though
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
return par, nil
|
||||
return parentFromChild(data)
|
||||
}
|
||||
|
||||
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||
|
|
|
@ -458,11 +458,22 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
return result, true
|
||||
}
|
||||
|
||||
obj, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
|
||||
src, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
return result, false
|
||||
}
|
||||
|
||||
var obj *objectSDK.Object
|
||||
for i := range f {
|
||||
if strings.HasPrefix(f[i].Header(), v2object.ReservedFilterPrefix) {
|
||||
obj = objectSDK.New()
|
||||
if err := obj.Unmarshal(src); err != nil {
|
||||
return result, false
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for i := range f {
|
||||
var data []byte
|
||||
switch f[i].Header() {
|
||||
|
@ -492,12 +503,15 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
cs, _ := obj.PayloadChecksum()
|
||||
data = cs.Value()
|
||||
default: // user attribute
|
||||
v, ok := attributeValue(obj, f[i].Header())
|
||||
if ok {
|
||||
if ech := obj.ECHeader(); ech != nil {
|
||||
result.SetObject(ech.Parent())
|
||||
res, err := attributeValueRaw(src, f[i].Header())
|
||||
if err != nil {
|
||||
return result, false
|
||||
}
|
||||
data = []byte(v)
|
||||
if res.attribute.found {
|
||||
if res.isEC {
|
||||
result.SetObject(res.parentID)
|
||||
}
|
||||
data = []byte(res.attribute.value)
|
||||
} else {
|
||||
return result, f[i].Operation() == objectSDK.MatchNotPresent
|
||||
}
|
||||
|
@ -516,9 +530,9 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
return result, true
|
||||
}
|
||||
|
||||
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
|
||||
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) ([]byte, bool, error) {
|
||||
buf := make([]byte, addressKeySize)
|
||||
obj, err := db.getWithCache(bc, tx, addr, buf, false, false, currEpoch)
|
||||
data, err := db.getWithCacheRaw(bc, tx, addr, buf, false, false, currEpoch)
|
||||
if err != nil {
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
if errors.As(err, &ecInfoError) {
|
||||
|
@ -528,15 +542,15 @@ func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Ad
|
|||
continue
|
||||
}
|
||||
addr.SetObject(objID)
|
||||
obj, err = db.getWithCache(bc, tx, addr, buf, true, false, currEpoch)
|
||||
data, err = db.getWithCacheRaw(bc, tx, addr, buf, true, false, currEpoch)
|
||||
if err == nil {
|
||||
return obj, true, nil
|
||||
return data, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
return obj, false, nil
|
||||
return data, false, nil
|
||||
}
|
||||
|
||||
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||
|
|
261
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
261
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
|
@ -0,0 +1,261 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/proto"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/VictoriaMetrics/easyproto"
|
||||
)
|
||||
|
||||
var errMalformedObject = errors.New("malformed object")
|
||||
|
||||
type protoAttribute struct {
|
||||
found bool
|
||||
value string
|
||||
}
|
||||
|
||||
type protoHeader struct {
|
||||
attribute protoAttribute
|
||||
ecHeader protoECHeader
|
||||
}
|
||||
|
||||
type protoECHeader struct {
|
||||
parentID protoOID
|
||||
attribute protoAttribute
|
||||
}
|
||||
|
||||
type protoOID oid.ID
|
||||
|
||||
type protoIter struct {
|
||||
fc easyproto.FieldContext
|
||||
data []byte
|
||||
failed bool
|
||||
eof bool
|
||||
}
|
||||
|
||||
func newProtoIter(data []byte, ok bool) protoIter {
|
||||
return protoIter{data: data, failed: !ok}
|
||||
}
|
||||
|
||||
func (p *protoIter) err() error {
|
||||
if p.failed {
|
||||
return errMalformedObject
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *protoIter) finished() bool {
|
||||
return p.failed || p.eof
|
||||
}
|
||||
|
||||
func (p *protoIter) next() {
|
||||
if p.failed {
|
||||
return
|
||||
}
|
||||
if len(p.data) == 0 {
|
||||
p.eof = true
|
||||
return
|
||||
}
|
||||
|
||||
data, err := p.fc.NextField(p.data)
|
||||
if err != nil {
|
||||
p.failed = true
|
||||
return
|
||||
}
|
||||
|
||||
p.data = data
|
||||
}
|
||||
|
||||
func (p *protoOID) fill(fc easyproto.FieldContext) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 1 { // Wire number for `id` field.
|
||||
rawID, ok := iter.fc.Bytes()
|
||||
if !ok {
|
||||
return errMalformedObject
|
||||
}
|
||||
return (*oid.ID)(p).Decode(rawID)
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoAttribute) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
var key, value string
|
||||
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var ok bool
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // Wire number for `key` field.
|
||||
key, ok = iter.fc.String()
|
||||
case 2: // Wire number for `value` field.
|
||||
value, ok = iter.fc.String()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
return errMalformedObject
|
||||
}
|
||||
}
|
||||
if key == attribute {
|
||||
p.found = true
|
||||
p.value = value
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoECHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var err error
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // Wire number for `parent` field.
|
||||
err = p.parentID.fill(iter.fc)
|
||||
case 8: // Wire number for `parent_attributes` field.
|
||||
err = p.attribute.fill(iter.fc, attribute)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var err error
|
||||
switch iter.fc.FieldNum {
|
||||
case 10: // Wire number for `attributes` field.
|
||||
err = p.attribute.fill(iter.fc, attribute)
|
||||
case 12: // Wire number for `ec` field.
|
||||
err = p.ecHeader.fill(iter.fc, attribute)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
type attributeMatchResult struct {
|
||||
attribute protoAttribute
|
||||
|
||||
isEC bool
|
||||
parentID oid.ID
|
||||
}
|
||||
|
||||
func attributeValueRaw(src []byte, attribute string) (attributeMatchResult, error) {
|
||||
iter := newProtoIter(src, true)
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 3 { // Wire number for `header` field.
|
||||
var p protoHeader
|
||||
if err := p.fill(iter.fc, attribute); err != nil {
|
||||
return attributeMatchResult{}, err
|
||||
}
|
||||
|
||||
if p.ecHeader.attribute.found {
|
||||
return attributeMatchResult{
|
||||
attribute: p.ecHeader.attribute,
|
||||
isEC: true,
|
||||
parentID: oid.ID(p.ecHeader.parentID),
|
||||
}, nil
|
||||
} else {
|
||||
return attributeMatchResult{attribute: p.attribute}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return attributeMatchResult{}, iter.err()
|
||||
}
|
||||
|
||||
func parentFromChild(src []byte) ([]byte, error) {
|
||||
iter := newProtoIter(src, true)
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 3 {
|
||||
return parentFromChildHeader(iter.fc)
|
||||
}
|
||||
}
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func parentFromChildHeader(fc easyproto.FieldContext) ([]byte, error) {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 11 {
|
||||
return parentFromSplitHeader(iter.fc)
|
||||
}
|
||||
}
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func parentFromSplitHeader(fc easyproto.FieldContext) ([]byte, error) {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
|
||||
var parentID, parentSig, parentHdr []byte
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var ok bool
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // parent id
|
||||
parentID, ok = iter.fc.MessageData()
|
||||
case 3: // parent signature
|
||||
parentSig, ok = iter.fc.MessageData()
|
||||
case 4: // parent header
|
||||
parentHdr, ok = iter.fc.MessageData()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
return nil, errMalformedObject
|
||||
}
|
||||
}
|
||||
|
||||
if parentSig == nil || parentHdr == nil {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
h := &header{
|
||||
parentID: parentID,
|
||||
parentSig: parentSig,
|
||||
parentHdr: parentHdr,
|
||||
}
|
||||
return h.StableMarshal(make([]byte, h.StableSize())), nil
|
||||
}
|
||||
|
||||
type (
|
||||
header struct {
|
||||
parentID bs
|
||||
parentSig bs
|
||||
parentHdr bs
|
||||
}
|
||||
)
|
||||
|
||||
func (o *header) StableSize() int {
|
||||
var size int
|
||||
size += proto.NestedStructureSize(1, &o.parentID)
|
||||
size += proto.NestedStructureSize(2, &o.parentSig)
|
||||
size += proto.NestedStructureSize(3, &o.parentHdr)
|
||||
return size
|
||||
}
|
||||
|
||||
func (o *header) StableMarshal(buf []byte) []byte {
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], &o.parentID)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], &o.parentSig)
|
||||
offset += proto.NestedStructureMarshal(3, buf[offset:], &o.parentHdr)
|
||||
return buf[:offset]
|
||||
}
|
||||
|
||||
type bs []byte
|
||||
|
||||
func (b *bs) StableSize() int {
|
||||
return len(*b)
|
||||
}
|
||||
|
||||
func (b *bs) StableMarshal(dst []byte) []byte {
|
||||
copy(dst, *b)
|
||||
return dst[:len(*b)]
|
||||
}
|
101
pkg/local_object_storage/metabase/select_proto_access_test.go
Normal file
101
pkg/local_object_storage/metabase/select_proto_access_test.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAttributeValueRaw_ZeroAlloc(t *testing.T) {
|
||||
const attrCount = 4
|
||||
attrs := make([]objectSDK.Attribute, attrCount)
|
||||
for i := range attrs {
|
||||
attrs[i].SetKey(strconv.Itoa(i))
|
||||
attrs[i].SetValue(strconv.Itoa(i * 1000))
|
||||
}
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: oidtest.ID(), Attributes: attrs[:attrCount/2]}, 0, 2, nil, 0)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
obj.SetECHeader(ech)
|
||||
obj.SetAttributes(attrs[attrCount/2:]...)
|
||||
|
||||
raw, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Zero(t, testing.AllocsPerRun(100, func() {
|
||||
res, err := attributeValueRaw(raw, "0")
|
||||
if err != nil || !res.attribute.found {
|
||||
t.Fatalf("err %v, found %t", err, res.attribute.found)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func BenchmarkParentFromChild(b *testing.B) {
|
||||
cnr := cidtest.ID()
|
||||
parent := testutil.GenerateObjectWithCIDWithPayload(cnr, nil)
|
||||
parent.SetPayloadSize(1234)
|
||||
|
||||
pk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, objectSDK.CalculateAndSetSignature(*pk, parent))
|
||||
|
||||
child := testutil.GenerateObjectWithCID(cnr)
|
||||
child.SetParent(parent)
|
||||
|
||||
data, err := child.Marshal()
|
||||
require.NoError(b, err)
|
||||
|
||||
o1, err := parentFromChildUnoptimized(data)
|
||||
require.NoError(b, err)
|
||||
o2, err := parentFromChild(data)
|
||||
require.NoError(b, err)
|
||||
|
||||
require.Equal(b, o1, o2)
|
||||
|
||||
b.Run("unoptimized", func(b *testing.B) {
|
||||
for range b.N {
|
||||
_, err := parentFromChildUnoptimized(data)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("proto access", func(b *testing.B) {
|
||||
for range b.N {
|
||||
_, err := parentFromChild(data)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func parentFromChildUnoptimized(src []byte) ([]byte, error) {
|
||||
child := objectSDK.New()
|
||||
|
||||
err := child.Unmarshal(src)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||
}
|
||||
|
||||
par := child.Parent()
|
||||
|
||||
if par == nil { // this should never happen though
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
return par.Marshal()
|
||||
}
|
Loading…
Add table
Reference in a new issue