frostfs-node/pkg/local_object_storage/metabase/list.go
Anton Nikiforov d0ed29b3c7
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 50s
DCO action / DCO (pull_request) Successful in 1m41s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m53s
Vulncheck / Vulncheck (pull_request) Successful in 2m10s
Build / Build Components (pull_request) Successful in 2m43s
Tests and linters / gopls check (pull_request) Successful in 3m0s
Tests and linters / Staticcheck (pull_request) Successful in 3m5s
Tests and linters / Lint (pull_request) Successful in 3m52s
Tests and linters / Tests (pull_request) Successful in 4m40s
Tests and linters / Tests with -race (pull_request) Successful in 6m17s
[#1350] node: Add ability to evacuate objects from REP 1 only
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-09-27 15:41:17 +03:00

479 lines
13 KiB
Go

package meta
import (
"bytes"
"context"
"time"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// ErrEndOfListing is returned from object listing with cursor
// when storage can't return any more objects after provided
// cursor. Use nil cursor object to start listing again.
var ErrEndOfListing = logicerr.New("end of object listing")
// Cursor is a type for continuous object listing.
type Cursor struct {
bucketName []byte
inBucketOffset []byte
}
// ListPrm contains parameters for ListWithCursor operation.
type ListPrm struct {
count int
cursor *Cursor
}
// SetCount sets maximum amount of addresses that ListWithCursor should return.
func (l *ListPrm) SetCount(count uint32) {
l.count = int(count)
}
// SetCursor sets cursor for ListWithCursor operation. For initial request
// ignore this param or use nil value. For consecutive requests, use value
// from ListRes.
func (l *ListPrm) SetCursor(cursor *Cursor) {
l.cursor = cursor
}
// ListRes contains values returned from ListWithCursor operation.
type ListRes struct {
addrList []objectcore.Info
cursor *Cursor
}
// AddressList returns addresses selected by ListWithCursor operation.
func (l ListRes) AddressList() []objectcore.Info {
return l.addrList
}
// Cursor returns cursor for consecutive listing requests.
func (l ListRes) Cursor() *Cursor {
return l.cursor
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name.
BucketName []byte
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
//
// Returns ErrEndOfListing if there are no more objects to return or count
// parameter is set to zero.
func (db *DB) ListWithCursor(ctx context.Context, prm ListPrm) (res ListRes, err error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ListWithCursor", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.ListWithCursor",
trace.WithAttributes(
attribute.Int("count", prm.count),
attribute.Bool("has_cursor", prm.cursor != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
}
result := make([]objectcore.Info, 0, prm.count)
err = db.boltDB.View(func(tx *bbolt.Tx) error {
res.addrList, res.cursor, err = db.listWithCursor(tx, result, prm.count, prm.cursor)
return err
})
success = err == nil
return res, metaerr.Wrap(err)
}
func (db *DB) listWithCursor(tx *bbolt.Tx, result []objectcore.Info, count int, cursor *Cursor) ([]objectcore.Info, *Cursor, error) {
threshold := cursor == nil // threshold is a flag to ignore cursor
var bucketName []byte
var err error
c := tx.Cursor()
name, _ := c.First()
if !threshold {
name, _ = c.Seek(cursor.bucketName)
}
var containerID cid.ID
var offset []byte
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
rawAddr := make([]byte, cidSize, addressKeySize)
loop:
for ; name != nil; name, _ = c.Next() {
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
continue
}
bkt := tx.Bucket(name)
if bkt != nil {
copy(rawAddr, cidRaw)
result, offset, cursor, err = selectNFromBucket(bkt, objType, graveyardBkt, garbageBkt, rawAddr, containerID,
result, count, cursor, threshold)
if err != nil {
return nil, nil, err
}
}
bucketName = name
if len(result) >= count {
break loop
}
// set threshold flag after first `selectNFromBucket` invocation
// first invocation must look for cursor object
threshold = true
}
if offset != nil {
// new slice is much faster but less memory efficient
// we need to copy, because offset exists during bbolt tx
cursor.inBucketOffset = make([]byte, len(offset))
copy(cursor.inBucketOffset, offset)
}
if len(result) == 0 {
return nil, nil, ErrEndOfListing
}
// new slice is much faster but less memory efficient
// we need to copy, because bucketName exists during bbolt tx
cursor.bucketName = make([]byte, len(bucketName))
copy(cursor.bucketName, bucketName)
return result, cursor, nil
}
// selectNFromBucket similar to selectAllFromBucket but uses cursor to find
// object to start selecting from. Ignores inhumed objects.
func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
to []objectcore.Info, // listing result
limit int, // stop listing at `limit` items in result
cursor *Cursor, // start from cursor object
threshold bool, // ignore cursor and start immediately
) ([]objectcore.Info, []byte, *Cursor, error) {
if cursor == nil {
cursor = new(Cursor)
}
count := len(to)
c := bkt.Cursor()
k, v := c.First()
offset := cursor.inBucketOffset
if !threshold {
c.Seek(offset)
k, v = c.Next() // we are looking for objects _after_ the cursor
}
for ; k != nil; k, v = c.Next() {
if count >= limit {
break
}
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
offset = k
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return nil, nil, nil, err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
to = append(to, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
count++
}
return to, offset, cursor, nil
}
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
if len(name) < bucketKeySize {
return nil, 0
}
rawID := name[1:bucketKeySize]
if err := containerID.Decode(rawID); err != nil {
return nil, 0
}
return rawID, name[0]
}
// IterateOverContainers lists physical containers available in metabase starting from first.
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverContainers(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
var containerID cid.ID
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
c := tx.Cursor()
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
err := prm.Handler(ctx, bktName, cnt)
if err != nil {
return err
}
}
}
return nil
}
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(containerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
}
}
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}
cidRaw := prm.BucketName[1:bucketKeySize]
if cidRaw == nil {
return 0, nil
}
var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, _ := c.First()
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++
}
return nil
})
success = err == nil
return count, metaerr.Wrap(err)
}