[#1516] metabase: Cache address key and do not decode address twice

```
name                        old time/op    new time/op    delta
ListWithCursor/1_item-8       10.6µs ± 1%     6.4µs ±13%  -39.62%  (p=0.000 n=7+10)
ListWithCursor/10_items-8     75.3µs ± 2%    30.9µs ±21%  -58.97%  (p=0.000 n=10+10)
ListWithCursor/100_items-8     726µs ± 2%     274µs ±27%  -62.28%  (p=0.000 n=10+10)

name                        old alloc/op   new alloc/op   delta
ListWithCursor/1_item-8       3.19kB ± 0%    2.26kB ± 0%  -29.21%  (p=0.000 n=10+10)
ListWithCursor/10_items-8     20.7kB ± 0%    10.8kB ± 0%  -47.68%  (p=0.000 n=10+8)
ListWithCursor/100_items-8     196kB ± 0%      97kB ± 0%  -50.65%  (p=0.000 n=7+10)

name                        old allocs/op  new allocs/op  delta
ListWithCursor/1_item-8         55.0 ± 0%      39.0 ± 0%  -29.09%  (p=0.000 n=10+10)
ListWithCursor/10_items-8        346 ± 0%       192 ± 0%  -44.51%  (p=0.000 n=10+10)
ListWithCursor/100_items-8     3.25k ± 0%     1.72k ± 0%  -47.13%  (p=0.000 n=9+10)
```

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-06-14 18:53:09 +03:00 committed by LeL
parent 504f45e9ee
commit af4db8a73b
2 changed files with 34 additions and 17 deletions

View file

@ -105,6 +105,11 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) {
// * 1 if object with GC mark;
// * 2 if object is covered with tombstone.
func inGraveyard(tx *bbolt.Tx, addr oid.Address) uint8 {
addrKey := addressKey(addr)
return inGraveyardWithKey(tx, addrKey)
}
func inGraveyardWithKey(tx *bbolt.Tx, addrKey []byte) uint8 {
graveyard := tx.Bucket(graveyardBucketName)
if graveyard == nil {
// incorrect metabase state, does not make
@ -112,7 +117,7 @@ func inGraveyard(tx *bbolt.Tx, addr oid.Address) uint8 {
return 0
}
val := graveyard.Get(addressKey(addr))
val := graveyard.Get(addrKey)
if val == nil {
garbageBCK := tx.Bucket(garbageBucketName)
if garbageBCK == nil {
@ -120,7 +125,7 @@ func inGraveyard(tx *bbolt.Tx, addr oid.Address) uint8 {
return 0
}
val = garbageBCK.Get(addressKey(addr))
val = garbageBCK.Get(addrKey)
if val != nil {
// object has been marked with GC
return 1

View file

@ -80,17 +80,18 @@ func ListWithCursor(db *DB, count uint32, cursor *Cursor) ([]oid.Address, *Curso
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter set to zero.
func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
result := make([]oid.Address, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, prm.count, prm.cursor)
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
return err
})
return res, err
}
func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]oid.Address, *Cursor, error) {
func (db *DB) listWithCursor(tx *bbolt.Tx, result []oid.Address, count int, cursor *Cursor) ([]oid.Address, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
result := make([]oid.Address, 0, count)
var bucketName []byte
c := tx.Cursor()
@ -100,10 +101,12 @@ func (db *DB) listWithCursor(tx *bbolt.Tx, count int, cursor *Cursor) ([]oid.Add
name, _ = c.Seek(cursor.bucketName)
}
var containerID cid.ID
loop:
for ; name != nil; name, _ = c.Next() {
containerID, postfix := parseContainerIDWithPostfix(name)
if containerID == nil {
postfix, ok := parseContainerIDWithPostfix(&containerID, name)
if !ok {
continue
}
@ -118,8 +121,7 @@ loop:
}
prefix := containerID.EncodeToString() + "/"
result, cursor = selectNFromBucket(tx, name, prefix, result, count, cursor, threshold)
result, cursor = selectNFromBucket(tx, name, prefix, containerID, result, count, cursor, threshold)
bucketName = name
if len(result) >= count {
break loop
@ -146,7 +148,8 @@ loop:
// object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(tx *bbolt.Tx,
name []byte, // bucket name
prefix string, // string of container ID, optimization
prefix string, // container ID prefix, optimization
cnt cid.ID, // container ID
to []oid.Address, // listing result
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
@ -172,18 +175,28 @@ func selectNFromBucket(tx *bbolt.Tx,
k, _ = c.Next() // we are looking for objects _after_ the cursor
}
addrRaw := make([]byte, len(prefix)+44)
copy(addrRaw, prefix)
for ; k != nil; k, _ = c.Next() {
if count >= limit {
break
}
var a oid.Address
if err := a.DecodeString(prefix + string(k)); err != nil {
var obj oid.ID
if err := obj.DecodeString(string(k)); err != nil {
break
}
offset = k
if inGraveyard(tx, a) > 0 {
addrRaw = append(addrRaw[:len(prefix)], k...)
if inGraveyardWithKey(tx, addrRaw) > 0 {
continue
}
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, a)
count++
}
@ -196,9 +209,8 @@ func selectNFromBucket(tx *bbolt.Tx,
return to, cursor
}
func parseContainerIDWithPostfix(name []byte) (*cid.ID, string) {
func parseContainerIDWithPostfix(containerID *cid.ID, name []byte) (string, bool) {
var (
containerID cid.ID
containerIDStr = string(name)
postfix string
)
@ -210,8 +222,8 @@ func parseContainerIDWithPostfix(name []byte) (*cid.ID, string) {
}
if err := containerID.DecodeString(containerIDStr); err != nil {
return nil, ""
return "", false
}
return &containerID, postfix
return postfix, true
}