WIP: Speedup search, part 3: don't allocate while searching for attributes #1687
4 changed files with 410 additions and 33 deletions
|
@ -2,7 +2,6 @@ package meta
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
|
@ -88,10 +87,16 @@ func (db *DB) Get(ctx context.Context, prm GetPrm) (res GetRes, err error) {
|
|||
}
|
||||
|
||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||
return db.getWithCache(nil, tx, addr, key, checkStatus, raw, currEpoch)
|
||||
data, err := db.getWithCacheRaw(nil, tx, addr, key, checkStatus, raw, currEpoch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||
func (db *DB) getWithCacheRaw(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw bool, currEpoch uint64) ([]byte, error) {
|
||||
if checkStatus {
|
||||
st, err := objectStatusWithCache(bc, tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
|
@ -109,13 +114,12 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
|
||||
key = objectKey(addr.Object(), key)
|
||||
cnr := addr.Container()
|
||||
obj := objectSDK.New()
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
// check in primary index
|
||||
if b := getPrimaryBucket(bc, tx, cnr); b != nil {
|
||||
if data := b.Get(key); len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,17 +131,17 @@ func (db *DB) getWithCache(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, key
|
|||
// if not found then check in tombstone index
|
||||
data = getFromBucket(tx, tombstoneBucketName(cnr, bucketName), key)
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// if not found then check in locker index
|
||||
data = getFromBucket(tx, bucketNameLockers(cnr, bucketName), key)
|
||||
|
||||
if len(data) != 0 {
|
||||
return obj, obj.Unmarshal(data)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// if not found then check if object is a virtual
|
||||
return getVirtualObject(tx, cnr, key, raw)
|
||||
return getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
|
||||
}
|
||||
|
||||
func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
||||
|
@ -150,6 +154,16 @@ func getFromBucket(tx *bbolt.Tx, name, key []byte) []byte {
|
|||
}
|
||||
|
||||
func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSDK.Object, error) {
|
||||
data, err := getVirtualObjectNoUnmarshal(tx, cnr, key, raw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
return obj, obj.Unmarshal(data)
|
||||
}
|
||||
|
||||
func getVirtualObjectNoUnmarshal(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) ([]byte, error) {
|
||||
if raw {
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
}
|
||||
|
@ -187,20 +201,7 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
}
|
||||
}
|
||||
|
||||
child := objectSDK.New()
|
||||
|
||||
err = child.Unmarshal(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||
}
|
||||
|
||||
par := child.Parent()
|
||||
|
||||
if par == nil { // this should never happen though
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
return par, nil
|
||||
return parentFromChild(data)
|
||||
}
|
||||
|
||||
func getSplitInfoError(tx *bbolt.Tx, cnr cid.ID, key []byte) error {
|
||||
|
|
|
@ -458,11 +458,22 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
return result, true
|
||||
}
|
||||
|
||||
obj, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
|
||||
src, isECChunk, err := db.getObjectForSlowFilters(bc, tx, addr, currEpoch)
|
||||
if err != nil {
|
||||
return result, false
|
||||
}
|
||||
|
||||
var obj *objectSDK.Object
|
||||
for i := range f {
|
||||
if strings.HasPrefix(f[i].Header(), v2object.ReservedFilterPrefix) {
|
||||
obj = objectSDK.New()
|
||||
if err := obj.Unmarshal(src); err != nil {
|
||||
return result, false
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for i := range f {
|
||||
var data []byte
|
||||
switch f[i].Header() {
|
||||
|
@ -492,12 +503,15 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
cs, _ := obj.PayloadChecksum()
|
||||
data = cs.Value()
|
||||
default: // user attribute
|
||||
v, ok := attributeValue(obj, f[i].Header())
|
||||
if ok {
|
||||
if ech := obj.ECHeader(); ech != nil {
|
||||
result.SetObject(ech.Parent())
|
||||
res, err := attributeValueRaw(src, f[i].Header())
|
||||
if err != nil {
|
||||
return result, false
|
||||
}
|
||||
if res.attribute.found {
|
||||
if res.isEC {
|
||||
result.SetObject(res.parentID)
|
||||
}
|
||||
data = []byte(v)
|
||||
data = []byte(res.attribute.value)
|
||||
} else {
|
||||
return result, f[i].Operation() == objectSDK.MatchNotPresent
|
||||
}
|
||||
|
@ -516,9 +530,9 @@ func (db *DB) matchSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address,
|
|||
return result, true
|
||||
}
|
||||
|
||||
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (*objectSDK.Object, bool, error) {
|
||||
func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Address, currEpoch uint64) ([]byte, bool, error) {
|
||||
buf := make([]byte, addressKeySize)
|
||||
obj, err := db.getWithCache(bc, tx, addr, buf, false, false, currEpoch)
|
||||
data, err := db.getWithCacheRaw(bc, tx, addr, buf, false, false, currEpoch)
|
||||
if err != nil {
|
||||
var ecInfoError *objectSDK.ECInfoError
|
||||
if errors.As(err, &ecInfoError) {
|
||||
|
@ -528,15 +542,15 @@ func (db *DB) getObjectForSlowFilters(bc *bucketCache, tx *bbolt.Tx, addr oid.Ad
|
|||
continue
|
||||
}
|
||||
addr.SetObject(objID)
|
||||
obj, err = db.getWithCache(bc, tx, addr, buf, true, false, currEpoch)
|
||||
data, err = db.getWithCacheRaw(bc, tx, addr, buf, true, false, currEpoch)
|
||||
if err == nil {
|
||||
return obj, true, nil
|
||||
return data, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false, err
|
||||
}
|
||||
return obj, false, nil
|
||||
return data, false, nil
|
||||
}
|
||||
|
||||
func attributeValue(obj *objectSDK.Object, attribute string) (string, bool) {
|
||||
|
|
261
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
261
pkg/local_object_storage/metabase/select_proto_access.go
Normal file
|
@ -0,0 +1,261 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/util/proto"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/VictoriaMetrics/easyproto"
|
||||
)
|
||||
|
||||
var errMalformedObject = errors.New("malformed object")
|
||||
|
||||
type protoAttribute struct {
|
||||
found bool
|
||||
value string
|
||||
}
|
||||
|
||||
type protoHeader struct {
|
||||
attribute protoAttribute
|
||||
ecHeader protoECHeader
|
||||
}
|
||||
|
||||
type protoECHeader struct {
|
||||
parentID protoOID
|
||||
attribute protoAttribute
|
||||
}
|
||||
|
||||
type protoOID oid.ID
|
||||
|
||||
type protoIter struct {
|
||||
fc easyproto.FieldContext
|
||||
data []byte
|
||||
failed bool
|
||||
eof bool
|
||||
}
|
||||
|
||||
func newProtoIter(data []byte, ok bool) protoIter {
|
||||
return protoIter{data: data, failed: !ok}
|
||||
}
|
||||
|
||||
func (p *protoIter) err() error {
|
||||
if p.failed {
|
||||
return errMalformedObject
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *protoIter) finished() bool {
|
||||
return p.failed || p.eof
|
||||
}
|
||||
|
||||
func (p *protoIter) next() {
|
||||
if p.failed {
|
||||
return
|
||||
}
|
||||
if len(p.data) == 0 {
|
||||
p.eof = true
|
||||
return
|
||||
}
|
||||
|
||||
data, err := p.fc.NextField(p.data)
|
||||
if err != nil {
|
||||
p.failed = true
|
||||
return
|
||||
}
|
||||
|
||||
p.data = data
|
||||
}
|
||||
|
||||
func (p *protoOID) fill(fc easyproto.FieldContext) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 1 { // Wire number for `id` field.
|
||||
rawID, ok := iter.fc.Bytes()
|
||||
if !ok {
|
||||
return errMalformedObject
|
||||
}
|
||||
return (*oid.ID)(p).Decode(rawID)
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoAttribute) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
var key, value string
|
||||
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var ok bool
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // Wire number for `key` field.
|
||||
key, ok = iter.fc.String()
|
||||
case 2: // Wire number for `value` field.
|
||||
value, ok = iter.fc.String()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
return errMalformedObject
|
||||
}
|
||||
}
|
||||
if key == attribute {
|
||||
p.found = true
|
||||
p.value = value
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoECHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var err error
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // Wire number for `parent` field.
|
||||
err = p.parentID.fill(iter.fc)
|
||||
case 8: // Wire number for `parent_attributes` field.
|
||||
err = p.attribute.fill(iter.fc, attribute)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
func (p *protoHeader) fill(fc easyproto.FieldContext, attribute string) error {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var err error
|
||||
switch iter.fc.FieldNum {
|
||||
case 10: // Wire number for `attributes` field.
|
||||
err = p.attribute.fill(iter.fc, attribute)
|
||||
case 12: // Wire number for `ec` field.
|
||||
err = p.ecHeader.fill(iter.fc, attribute)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return iter.err()
|
||||
}
|
||||
|
||||
type attributeMatchResult struct {
|
||||
attribute protoAttribute
|
||||
|
||||
isEC bool
|
||||
parentID oid.ID
|
||||
}
|
||||
|
||||
func attributeValueRaw(src []byte, attribute string) (attributeMatchResult, error) {
|
||||
iter := newProtoIter(src, true)
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 3 { // Wire number for `header` field.
|
||||
var p protoHeader
|
||||
if err := p.fill(iter.fc, attribute); err != nil {
|
||||
return attributeMatchResult{}, err
|
||||
}
|
||||
|
||||
if p.ecHeader.attribute.found {
|
||||
return attributeMatchResult{
|
||||
attribute: p.ecHeader.attribute,
|
||||
isEC: true,
|
||||
parentID: oid.ID(p.ecHeader.parentID),
|
||||
}, nil
|
||||
} else {
|
||||
return attributeMatchResult{attribute: p.attribute}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return attributeMatchResult{}, iter.err()
|
||||
}
|
||||
|
||||
func parentFromChild(src []byte) ([]byte, error) {
|
||||
iter := newProtoIter(src, true)
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 3 {
|
||||
return parentFromChildHeader(iter.fc)
|
||||
}
|
||||
}
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func parentFromChildHeader(fc easyproto.FieldContext) ([]byte, error) {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
if iter.fc.FieldNum == 11 {
|
||||
return parentFromSplitHeader(iter.fc)
|
||||
}
|
||||
}
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func parentFromSplitHeader(fc easyproto.FieldContext) ([]byte, error) {
|
||||
iter := newProtoIter(fc.MessageData())
|
||||
|
||||
var parentID, parentSig, parentHdr []byte
|
||||
for iter.next(); !iter.finished(); iter.next() {
|
||||
var ok bool
|
||||
switch iter.fc.FieldNum {
|
||||
case 1: // parent id
|
||||
parentID, ok = iter.fc.MessageData()
|
||||
case 3: // parent signature
|
||||
parentSig, ok = iter.fc.MessageData()
|
||||
case 4: // parent header
|
||||
parentHdr, ok = iter.fc.MessageData()
|
||||
default:
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
return nil, errMalformedObject
|
||||
}
|
||||
}
|
||||
|
||||
if parentSig == nil || parentHdr == nil {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
h := &header{
|
||||
parentID: parentID,
|
||||
parentSig: parentSig,
|
||||
parentHdr: parentHdr,
|
||||
}
|
||||
return h.StableMarshal(make([]byte, h.StableSize())), nil
|
||||
}
|
||||
|
||||
type (
|
||||
header struct {
|
||||
parentID bs
|
||||
parentSig bs
|
||||
parentHdr bs
|
||||
}
|
||||
)
|
||||
|
||||
func (o *header) StableSize() int {
|
||||
var size int
|
||||
size += proto.NestedStructureSize(1, &o.parentID)
|
||||
size += proto.NestedStructureSize(2, &o.parentSig)
|
||||
size += proto.NestedStructureSize(3, &o.parentHdr)
|
||||
return size
|
||||
}
|
||||
|
||||
func (o *header) StableMarshal(buf []byte) []byte {
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], &o.parentID)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], &o.parentSig)
|
||||
offset += proto.NestedStructureMarshal(3, buf[offset:], &o.parentHdr)
|
||||
return buf[:offset]
|
||||
}
|
||||
|
||||
type bs []byte
|
||||
|
||||
func (b *bs) StableSize() int {
|
||||
return len(*b)
|
||||
}
|
||||
|
||||
func (b *bs) StableMarshal(dst []byte) []byte {
|
||||
copy(dst, *b)
|
||||
return dst[:len(*b)]
|
||||
}
|
101
pkg/local_object_storage/metabase/select_proto_access_test.go
Normal file
101
pkg/local_object_storage/metabase/select_proto_access_test.go
Normal file
|
@ -0,0 +1,101 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAttributeValueRaw_ZeroAlloc(t *testing.T) {
|
||||
const attrCount = 4
|
||||
attrs := make([]objectSDK.Attribute, attrCount)
|
||||
for i := range attrs {
|
||||
attrs[i].SetKey(strconv.Itoa(i))
|
||||
attrs[i].SetValue(strconv.Itoa(i * 1000))
|
||||
}
|
||||
|
||||
cnr := cidtest.ID()
|
||||
ech := objectSDK.NewECHeader(objectSDK.ECParentInfo{ID: oidtest.ID(), Attributes: attrs[:attrCount/2]}, 0, 2, nil, 0)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cnr)
|
||||
obj.SetECHeader(ech)
|
||||
obj.SetAttributes(attrs[attrCount/2:]...)
|
||||
|
||||
raw, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Zero(t, testing.AllocsPerRun(100, func() {
|
||||
res, err := attributeValueRaw(raw, "0")
|
||||
if err != nil || !res.attribute.found {
|
||||
t.Fatalf("err %v, found %t", err, res.attribute.found)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func BenchmarkParentFromChild(b *testing.B) {
|
||||
cnr := cidtest.ID()
|
||||
parent := testutil.GenerateObjectWithCIDWithPayload(cnr, nil)
|
||||
parent.SetPayloadSize(1234)
|
||||
|
||||
pk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
require.NoError(b, err)
|
||||
require.NoError(b, objectSDK.CalculateAndSetSignature(*pk, parent))
|
||||
|
||||
child := testutil.GenerateObjectWithCID(cnr)
|
||||
child.SetParent(parent)
|
||||
|
||||
data, err := child.Marshal()
|
||||
require.NoError(b, err)
|
||||
|
||||
o1, err := parentFromChildUnoptimized(data)
|
||||
require.NoError(b, err)
|
||||
o2, err := parentFromChild(data)
|
||||
require.NoError(b, err)
|
||||
|
||||
require.Equal(b, o1, o2)
|
||||
|
||||
b.Run("unoptimized", func(b *testing.B) {
|
||||
for range b.N {
|
||||
_, err := parentFromChildUnoptimized(data)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
b.Run("proto access", func(b *testing.B) {
|
||||
for range b.N {
|
||||
_, err := parentFromChild(data)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func parentFromChildUnoptimized(src []byte) ([]byte, error) {
|
||||
child := objectSDK.New()
|
||||
|
||||
err := child.Unmarshal(src)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unmarshal child with parent: %w", err)
|
||||
}
|
||||
|
||||
par := child.Parent()
|
||||
|
||||
if par == nil { // this should never happen though
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
return par.Marshal()
|
||||
}
|
Loading…
Add table
Reference in a new issue
This is bad, it will be fixed, of course.
We can do much better
What bothers me is that this new code should also do marshaling, because we
Parent()
method on child reassembles object from fields in its split header.On the other hand, we only need header in SEARCH, so we might avoid marshaling at the cost of making
getRaw
more complex.