forked from TrueCloudLab/frostfs-node
239 lines
6.4 KiB
Go
239 lines
6.4 KiB
Go
package meta
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// ErrEndOfListing is returned from object listing with cursor
|
|
// when storage can't return any more objects after provided
|
|
// cursor. Use nil cursor object to start listing again.
|
|
var ErrEndOfListing = logicerr.New("end of object listing")
|
|
|
|
// Cursor is a type for continuous object listing.
|
|
type Cursor struct {
|
|
bucketName []byte
|
|
inBucketOffset []byte
|
|
}
|
|
|
|
// ListPrm contains parameters for ListWithCursor operation.
|
|
type ListPrm struct {
|
|
count int
|
|
cursor *Cursor
|
|
}
|
|
|
|
// SetCount sets maximum amount of addresses that ListWithCursor should return.
|
|
func (l *ListPrm) SetCount(count uint32) {
|
|
l.count = int(count)
|
|
}
|
|
|
|
// SetCursor sets cursor for ListWithCursor operation. For initial request
|
|
// ignore this param or use nil value. For consecutive requests, use value
|
|
// from ListRes.
|
|
func (l *ListPrm) SetCursor(cursor *Cursor) {
|
|
l.cursor = cursor
|
|
}
|
|
|
|
// ListRes contains values returned from ListWithCursor operation.
|
|
type ListRes struct {
|
|
addrList []objectcore.AddressWithType
|
|
cursor *Cursor
|
|
}
|
|
|
|
// AddressList returns addresses selected by ListWithCursor operation.
|
|
func (l ListRes) AddressList() []objectcore.AddressWithType {
|
|
return l.addrList
|
|
}
|
|
|
|
// Cursor returns cursor for consecutive listing requests.
|
|
func (l ListRes) Cursor() *Cursor {
|
|
return l.cursor
|
|
}
|
|
|
|
// ListWithCursor lists physical objects available in metabase starting from
|
|
// cursor. Includes objects of all types. Does not include inhumed objects.
|
|
// Use cursor value from response for consecutive requests.
|
|
//
|
|
// Returns ErrEndOfListing if there are no more objects to return or count
|
|
// parameter set to zero.
|
|
func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err error) {
|
|
var (
|
|
startedAt = time.Now()
|
|
success = false
|
|
)
|
|
defer func() {
|
|
db.metrics.AddMethodDuration("ListWithCursor", time.Since(startedAt), success)
|
|
}()
|
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.ListWithCursor",
|
|
trace.WithAttributes(
|
|
attribute.Int("count", prm.count),
|
|
attribute.Bool("has_cursor", prm.cursor != nil),
|
|
))
|
|
defer span.End()
|
|
|
|
db.modeMtx.RLock()
|
|
defer db.modeMtx.RUnlock()
|
|
|
|
if db.mode.NoMetabase() {
|
|
return res, ErrDegradedMode
|
|
}
|
|
|
|
result := make([]objectcore.AddressWithType, 0, prm.count)
|
|
|
|
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
|
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
|
|
return err
|
|
})
|
|
success = err == nil
|
|
return res, metaerr.Wrap(err)
|
|
}
|
|
|
|
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.AddressWithType, count int, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) {
|
|
threshold := cursor == nil // threshold is a flag to ignore cursor
|
|
var bucketName []byte
|
|
|
|
c := tx.Cursor()
|
|
name, _ := c.First()
|
|
|
|
if !threshold {
|
|
name, _ = c.Seek(cursor.bucketName)
|
|
}
|
|
|
|
var containerID cid.ID
|
|
var offset []byte
|
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
|
garbageBkt := tx.Bucket(garbageBucketName)
|
|
|
|
var rawAddr = make([]byte, cidSize, addressKeySize)
|
|
|
|
loop:
|
|
for ; name != nil; name, _ = c.Next() {
|
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
|
if cidRaw == nil {
|
|
continue
|
|
}
|
|
|
|
var objType object.Type
|
|
|
|
switch prefix {
|
|
case primaryPrefix:
|
|
objType = object.TypeRegular
|
|
case lockersPrefix:
|
|
objType = object.TypeLock
|
|
case tombstonePrefix:
|
|
objType = object.TypeTombstone
|
|
default:
|
|
continue
|
|
}
|
|
|
|
bkt := tx.Bucket(name)
|
|
if bkt != nil {
|
|
copy(rawAddr, cidRaw)
|
|
result, offset, cursor = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
|
|
result, count, cursor, threshold)
|
|
}
|
|
bucketName = name
|
|
if len(result) >= count {
|
|
break loop
|
|
}
|
|
|
|
// set threshold flag after first `selectNFromBucket` invocation
|
|
// first invocation must look for cursor object
|
|
threshold = true
|
|
}
|
|
|
|
if offset != nil {
|
|
// new slice is much faster but less memory efficient
|
|
// we need to copy, because offset exists during bbolt tx
|
|
cursor.inBucketOffset = make([]byte, len(offset))
|
|
copy(cursor.inBucketOffset, offset)
|
|
}
|
|
|
|
if len(result) == 0 {
|
|
return nil, nil, ErrEndOfListing
|
|
}
|
|
|
|
// new slice is much faster but less memory efficient
|
|
// we need to copy, because bucketName exists during bbolt tx
|
|
cursor.bucketName = make([]byte, len(bucketName))
|
|
copy(cursor.bucketName, bucketName)
|
|
|
|
return result, cursor, nil
|
|
}
|
|
|
|
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
|
|
// object to start selecting from. Ignores inhumed objects.
|
|
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|
objType object.Type, // type of the objects stored in the main bucket
|
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
|
cidRaw []byte, // container ID prefix, optimization
|
|
cnt cid.ID, // container ID
|
|
to []objectcore.AddressWithType, // listing result
|
|
limit int, // stop listing at `limit` items in result
|
|
cursor *Cursor, // start from cursor object
|
|
threshold bool, // ignore cursor and start immediately
|
|
) ([]objectcore.AddressWithType, []byte, *Cursor) {
|
|
if cursor == nil {
|
|
cursor = new(Cursor)
|
|
}
|
|
|
|
count := len(to)
|
|
c := bkt.Cursor()
|
|
k, _ := c.First()
|
|
|
|
offset := cursor.inBucketOffset
|
|
|
|
if !threshold {
|
|
c.Seek(offset)
|
|
k, _ = c.Next() // we are looking for objects _after_ the cursor
|
|
}
|
|
|
|
for ; k != nil; k, _ = c.Next() {
|
|
if count >= limit {
|
|
break
|
|
}
|
|
|
|
var obj oid.ID
|
|
if err := obj.Decode(k); err != nil {
|
|
break
|
|
}
|
|
|
|
offset = k
|
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
|
continue
|
|
}
|
|
|
|
var a oid.Address
|
|
a.SetContainer(cnt)
|
|
a.SetObject(obj)
|
|
to = append(to, objectcore.AddressWithType{Address: a, Type: objType})
|
|
count++
|
|
}
|
|
|
|
return to, offset, cursor
|
|
}
|
|
|
|
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
|
if len(name) < bucketKeySize {
|
|
return nil, 0
|
|
}
|
|
|
|
rawID := name[1:bucketKeySize]
|
|
|
|
if err := containerID.Decode(rawID); err != nil {
|
|
return nil, 0
|
|
}
|
|
|
|
return rawID, name[0]
|
|
}
|