WIP: Change metabase engine to pebble #1221
2 changed files with 144 additions and 149 deletions
|
@ -2,16 +2,17 @@ package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"github.com/cockroachdb/pebble"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
@ -21,10 +22,33 @@ import (
|
||||||
// cursor. Use nil cursor object to start listing again.
|
// cursor. Use nil cursor object to start listing again.
|
||||||
var ErrEndOfListing = logicerr.New("end of object listing")
|
var ErrEndOfListing = logicerr.New("end of object listing")
|
||||||
|
|
||||||
|
type listPrefix struct {
|
||||||
|
prefix []byte
|
||||||
|
keyParser func(k []byte) (oid.Address, error)
|
||||||
|
objectType objectSDK.Type
|
||||||
|
}
|
||||||
|
|
||||||
|
var listPrefixes = []listPrefix{
|
||||||
|
{
|
||||||
|
prefix: []byte{primaryPrefix},
|
||||||
|
keyParser: addressFromPrimaryKey,
|
||||||
|
objectType: objectSDK.TypeRegular,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: []byte{lockersPrefix},
|
||||||
|
keyParser: addressFromLockersKey,
|
||||||
|
objectType: objectSDK.TypeLock,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
prefix: []byte{tombstonePrefix},
|
||||||
|
keyParser: addressFromTombstoneKey,
|
||||||
|
objectType: objectSDK.TypeTombstone,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// Cursor is a type for continuous object listing.
|
// Cursor is a type for continuous object listing.
|
||||||
type Cursor struct {
|
type Cursor struct {
|
||||||
bucketName []byte
|
lastKey []byte
|
||||||
inBucketOffset []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPrm contains parameters for ListWithCursor operation.
|
// ListPrm contains parameters for ListWithCursor operation.
|
||||||
|
@ -89,173 +113,109 @@ func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err
|
||||||
return res, ErrDegradedMode
|
return res, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make([]objectcore.Info, 0, prm.count)
|
if prm.count == 0 {
|
||||||
|
return ListRes{}, ErrEndOfListing
|
||||||
|
}
|
||||||
|
|
||||||
err = db.database.View(func(tx *bbolt.Tx) error {
|
err = db.snapshot(func(s *pebble.Snapshot) error {
|
||||||
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
res.addrList, res.cursor, err = db.listWithCursor(ctx, s, prm.count, prm.cursor)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return res, metaerr.Wrap(err)
|
return res, metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
|
func (db *DB) listWithCursor(ctx context.Context, r pebble.Reader, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
|
||||||
threshold := cursor == nil // threshold is a flag to ignore cursor
|
var prefix []byte
|
||||||
var bucketName []byte
|
var lastSeen []byte
|
||||||
var err error
|
if cursor != nil {
|
||||||
|
prefix = []byte{cursor.lastKey[0]}
|
||||||
c := tx.Cursor()
|
lastSeen = cursor.lastKey
|
||||||
name, _ := c.First()
|
} else {
|
||||||
|
prefix = listPrefixes[0].prefix
|
||||||
if !threshold {
|
|
||||||
name, _ = c.Seek(cursor.bucketName)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var containerID cid.ID
|
idx := slices.IndexFunc(listPrefixes, func(e listPrefix) bool {
|
||||||
var offset []byte
|
return e.prefix[0] == prefix[0]
|
||||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
})
|
||||||
garbageBkt := tx.Bucket(garbageBucketName)
|
if idx < 0 {
|
||||||
|
return nil, nil, fmt.Errorf("invalid prefix value %d", prefix[0])
|
||||||
rawAddr := make([]byte, cidSize, addressKeySize)
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for ; name != nil; name, _ = c.Next() {
|
|
||||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
|
||||||
if cidRaw == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var objType objectSDK.Type
|
|
||||||
|
|
||||||
switch prefix {
|
|
||||||
case primaryPrefix:
|
|
||||||
objType = objectSDK.TypeRegular
|
|
||||||
case lockersPrefix:
|
|
||||||
objType = objectSDK.TypeLock
|
|
||||||
case tombstonePrefix:
|
|
||||||
objType = objectSDK.TypeTombstone
|
|
||||||
default:
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
bkt := tx.Bucket(name)
|
|
||||||
if bkt != nil {
|
|
||||||
copy(rawAddr, cidRaw)
|
|
||||||
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
|
||||||
result, count, cursor, threshold)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bucketName = name
|
|
||||||
if len(result) >= count {
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
// set threshold flag after first `selectNFromBucket` invocation
|
|
||||||
// first invocation must look for cursor object
|
|
||||||
threshold = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if offset != nil {
|
var next Cursor
|
||||||
// new slice is much faster but less memory efficient
|
result := make([]objectcore.Info, 0, count)
|
||||||
// we need to copy, because offset exists during bbolt tx
|
for ; idx < len(listPrefixes); idx++ {
|
||||||
cursor.inBucketOffset = make([]byte, len(offset))
|
indexResult, lastIndexSeen, err := listByPrefix(ctx, r, lastSeen, idx, count-len(result))
|
||||||
copy(cursor.inBucketOffset, offset)
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
result = append(result, indexResult...)
|
||||||
|
if len(lastIndexSeen) > 0 {
|
||||||
|
next.lastKey = lastIndexSeen
|
||||||
|
}
|
||||||
|
if len(result) == count {
|
||||||
|
return result, &next, nil
|
||||||
|
}
|
||||||
|
lastSeen = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result) == 0 {
|
if len(result) == 0 {
|
||||||
return nil, nil, ErrEndOfListing
|
return nil, nil, ErrEndOfListing
|
||||||
}
|
}
|
||||||
|
return result, &next, nil
|
||||||
// new slice is much faster but less memory efficient
|
|
||||||
// we need to copy, because bucketName exists during bbolt tx
|
|
||||||
cursor.bucketName = make([]byte, len(bucketName))
|
|
||||||
copy(cursor.bucketName, bucketName)
|
|
||||||
|
|
||||||
return result, cursor, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
func listByPrefix(ctx context.Context, r pebble.Reader, lastSeen []byte, idx int, count int) ([]objectcore.Info, []byte, error) {
|
||||||
// object to start selecting from. Ignores inhumed objects.
|
var result []objectcore.Info
|
||||||
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
for {
|
||||||
objType objectSDK.Type, // type of the objects stored in the main bucket
|
kvs, err := selectByPrefixAndSeek(ctx, r, listPrefixes[idx].prefix, lastSeen, count-len(result))
|
||||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
if err != nil {
|
||||||
cidRaw []byte, // container ID prefix, optimization
|
return nil, nil, err
|
||||||
cnt cid.ID, // container ID
|
|
||||||
to []objectcore.Info, // listing result
|
|
||||||
limit int, // stop listing at `limit` items in result
|
|
||||||
cursor *Cursor, // start from cursor object
|
|
||||||
threshold bool, // ignore cursor and start immediately
|
|
||||||
) ([]objectcore.Info, []byte, *Cursor, error) {
|
|
||||||
if cursor == nil {
|
|
||||||
cursor = new(Cursor)
|
|
||||||
}
|
|
||||||
|
|
||||||
count := len(to)
|
|
||||||
c := bkt.Cursor()
|
|
||||||
k, v := c.First()
|
|
||||||
|
|
||||||
offset := cursor.inBucketOffset
|
|
||||||
|
|
||||||
if !threshold {
|
|
||||||
c.Seek(offset)
|
|
||||||
k, v = c.Next() // we are looking for objects _after_ the cursor
|
|
||||||
}
|
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
|
||||||
if count >= limit {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
if len(kvs) == 0 {
|
||||||
var obj oid.ID
|
return result, lastSeen, nil
|
||||||
if err := obj.Decode(k); err != nil {
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
for _, kv := range kvs {
|
||||||
offset = k
|
lastSeen = kv.Key
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
addr, err := listPrefixes[idx].keyParser(kv.Key)
|
||||||
continue
|
if err != nil {
|
||||||
}
|
return nil, nil, err
|
||||||
|
|
||||||
var isLinkingObj bool
|
|
||||||
var ecInfo *objectcore.ECInfo
|
|
||||||
if objType == objectSDK.TypeRegular {
|
|
||||||
var o objectSDK.Object
|
|
||||||
if err := o.Unmarshal(v); err != nil {
|
|
||||||
return nil, nil, nil, err
|
|
||||||
}
|
}
|
||||||
isLinkingObj = isLinkObject(&o)
|
st, err := inGraveyardWithKey(r, addr)
|
||||||
ecHeader := o.ECHeader()
|
if err != nil {
|
||||||
if ecHeader != nil {
|
return nil, nil, err
|
||||||
ecInfo = &objectcore.ECInfo{
|
}
|
||||||
ParentID: ecHeader.Parent(),
|
if st > 0 {
|
||||||
Index: ecHeader.Index(),
|
continue
|
||||||
Total: ecHeader.Total(),
|
}
|
||||||
|
|
||||||
|
var isLinkingObj bool
|
||||||
|
var ecInfo *objectcore.ECInfo
|
||||||
|
if listPrefixes[idx].objectType == objectSDK.TypeRegular {
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(kv.Value); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
ecHeader := o.ECHeader()
|
||||||
|
if ecHeader != nil {
|
||||||
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
ParentID: ecHeader.Parent(),
|
||||||
|
Index: ecHeader.Index(),
|
||||||
|
Total: ecHeader.Total(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
result = append(result, objectcore.Info{
|
||||||
|
Address: addr,
|
||||||
|
Type: listPrefixes[idx].objectType,
|
||||||
|
IsLinkingObject: isLinkingObj,
|
||||||
|
ECInfo: ecInfo,
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(result) == count {
|
||||||
|
return result, lastSeen, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var a oid.Address
|
|
||||||
a.SetContainer(cnt)
|
|
||||||
a.SetObject(obj)
|
|
||||||
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
|
||||||
count++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return to, offset, cursor, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
|
||||||
if len(name) < bucketKeySize {
|
|
||||||
return nil, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
rawID := name[1:bucketKeySize]
|
|
||||||
|
|
||||||
if err := containerID.Decode(rawID); err != nil {
|
|
||||||
return nil, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return rawID, name[0]
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,3 +89,38 @@ func deleteByPrefix(ctx context.Context, b *pebble.Batch, prefix []byte) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func selectByPrefixAndSeek(ctx context.Context, r pebble.Reader, prefix, lastSeen []byte, batchSize int) ([]keyValue, error) {
|
||||||
|
it, err := r.NewIter(&pebble.IterOptions{
|
||||||
|
LowerBound: prefix,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []keyValue
|
||||||
|
var v bool
|
||||||
|
if len(lastSeen) == 0 {
|
||||||
|
v = it.First()
|
||||||
|
} else {
|
||||||
|
v = it.SeekGE(lastSeen)
|
||||||
|
}
|
||||||
|
for ; v && bytes.HasPrefix(it.Key(), prefix); v = it.Next() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, errors.Join(ctx.Err(), it.Close())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if bytes.Equal(lastSeen, it.Key()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var current keyValue
|
||||||
|
current.Key = bytes.Clone(it.Key())
|
||||||
|
current.Value = bytes.Clone(it.Value())
|
||||||
|
result = append(result, current)
|
||||||
|
if len(result) == batchSize {
|
||||||
|
return result, it.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, it.Close()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue