metabase: Hide BucketName form upper levels #1423

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/count-in-bucket into master 2024-10-09 06:49:16 +00:00
4 changed files with 143 additions and 62 deletions

View file

@ -435,7 +435,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
) error { ) error {
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error { cntPrm.Handler = func(ctx context.Context, objType objectSDK.Type, cnt cid.ID) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return context.Cause(ctx) return context.Cause(ctx)
@ -455,8 +455,11 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
skip = e.isNotRepOne(c) skip = e.isNotRepOne(c)
} }
if skip { if skip {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name} countPrm := shard.CountAliveObjectsInContainerPrm{
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm) ObjectType: objType,
ContainerID: cnt,
}
count, err := sh.CountAliveObjectsInContainer(ctx, countPrm)
if err != nil { if err != nil {
return err return err
} }
@ -464,7 +467,8 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
return nil return nil
} }
var objPrm shard.IterateOverObjectsInContainerPrm var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name objPrm.ObjectType = objType
objPrm.ContainerID = cnt
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View file

@ -65,21 +65,25 @@ func (l ListRes) Cursor() *Cursor {
// IterateOverContainersPrm contains parameters for IterateOverContainers operation. // IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct { type IterateOverContainersPrm struct {
// Handler function executed upon containers in db. // Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error Handler func(context.Context, objectSDK.Type, cid.ID) error
} }
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. // IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct { type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name. // ObjectType type of objects to iterate over.
BucketName []byte ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db. // Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error Handler func(context.Context, *objectcore.Info) error
} }
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation. // CountAliveObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct { type CountAliveObjectsInContainerPrm struct {
// BucketName container's bucket name. // ObjectType type of objects to iterate over.
BucketName []byte ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
} }
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
@ -319,12 +323,20 @@ func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm Itera
if cidRaw == nil { if cidRaw == nil {
continue continue
} }
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID var cnt cid.ID
copy(cnt[:], containerID[:]) copy(cnt[:], containerID[:])
err := prm.Handler(ctx, bktName, cnt) var objType objectSDK.Type
switch prefix[0] {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
continue
}
err := prm.Handler(ctx, objType, cnt)
if err != nil { if err != nil {
return err return err
} }
@ -356,22 +368,29 @@ func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOver
return ErrDegradedMode return ErrDegradedMode
} }
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error { err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm) return db.iterateOverObjectsInContainer(ctx, tx, prm)
}) })
success = err == nil success = err == nil
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte, func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
containerID cid.ID, prm IterateOverObjectsInContainerPrm, var prefix byte
) error { switch prm.ObjectType {
bkt := tx.Bucket(prm.BucketName) case objectSDK.TypeRegular:
prefix = primaryPrefix
case objectSDK.TypeLock:
prefix = lockersPrefix
case objectSDK.TypeTombstone:
prefix = tombstonePrefix
default:
return nil
}
bucketName := []byte{prefix}
bucketName = append(bucketName, prm.ContainerID[:]...)
bkt := tx.Bucket(bucketName)
if bkt == nil { if bkt == nil {
return nil return nil
} }
@ -380,32 +399,19 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
c := bkt.Cursor() c := bkt.Cursor()
k, v := c.First() k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() { for ; k != nil; k, v = c.Next() {
var obj oid.ID var obj oid.ID
if err := obj.Decode(k); err != nil { if err := obj.Decode(k); err != nil {
break break
} }
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
continue continue
} }
var isLinkingObj bool var isLinkingObj bool
var ecInfo *objectcore.ECInfo var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular { if prm.ObjectType == objectSDK.TypeRegular {
var o objectSDK.Object var o objectSDK.Object
if err := o.Unmarshal(v); err != nil { if err := o.Unmarshal(v); err != nil {
return err return err
@ -422,9 +428,9 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
} }
var a oid.Address var a oid.Address
a.SetContainer(containerID) a.SetContainer(prm.ContainerID)
a.SetObject(obj) a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} objInfo := objectcore.Info{Address: a, Type: prm.ObjectType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo) err := prm.Handler(ctx, &objInfo)
if err != nil { if err != nil {
return err return err
@ -433,8 +439,8 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
return nil return nil
} }
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. // CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { func (db *DB) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
var ( var (
startedAt = time.Now() startedAt = time.Now()
success = false success = false
@ -452,14 +458,22 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
return 0, ErrDegradedMode return 0, ErrDegradedMode
} }
if len(prm.BucketName) != bucketKeySize { var prefix byte
switch prm.ObjectType {
case objectSDK.TypeRegular:
prefix = primaryPrefix
case objectSDK.TypeLock:
prefix = lockersPrefix
case objectSDK.TypeTombstone:
prefix = tombstonePrefix
default:
return 0, nil return 0, nil
} }
bucketName := []byte{prefix}
cidRaw := prm.BucketName[1:bucketKeySize] bucketName = append(bucketName, prm.ContainerID[:]...)
var count uint64 var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error { err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName) bkt := tx.Bucket(bucketName)
if bkt == nil { if bkt == nil {
return nil return nil
} }
@ -468,7 +482,7 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
c := bkt.Cursor() c := bkt.Cursor()
k, _ := c.First() k, _ := c.First()
for ; k != nil; k, _ = c.Next() { for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
continue continue
} }
count++ count++

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -219,3 +220,59 @@ func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]objec
r, err := db.ListWithCursor(context.Background(), listPrm) r, err := db.ListWithCursor(context.Background(), listPrm)
return r.AddressList(), r.Cursor(), err return r.AddressList(), r.Cursor(), err
} }
func TestIterateOver(t *testing.T) {
t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
const total uint64 = 5
for _, typ := range []objectSDK.Type{objectSDK.TypeRegular, objectSDK.TypeTombstone, objectSDK.TypeLock} {
var expected []*objectSDK.Object
// fill metabase with objects
cid := cidtest.ID()
for range total {
obj := testutil.GenerateObjectWithCID(cid)
obj.SetType(typ)
err := metaPut(db, obj, nil)
require.NoError(t, err)
expected = append(expected, obj)
}
var metaIter meta.IterateOverObjectsInContainerPrm
var count uint64
metaIter.Handler = func(context.Context, *object.Info) error {
count++
return nil
}
metaIter.ContainerID = cid
metaIter.ObjectType = typ
err := db.IterateOverObjectsInContainer(context.Background(), metaIter)
require.NoError(t, err)
require.Equal(t, total, count)
var metaCount meta.CountAliveObjectsInContainerPrm
metaCount.ContainerID = cid
metaCount.ObjectType = typ
res, err := db.CountAliveObjectsInContainer(context.Background(), metaCount)
require.NoError(t, err)
require.Equal(t, res, total)
err = metaDelete(db, object.AddressOf(expected[0]), object.AddressOf(expected[1]))
require.NoError(t, err)
res, err = db.CountAliveObjectsInContainer(context.Background(), metaCount)
require.NoError(t, err)
require.Equal(t, uint64(3), res)
}
var count int
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = func(context.Context, objectSDK.Type, cidSDK.ID) error {
count++
return nil
}
err := db.IterateOverContainers(context.Background(), metaPrm)
require.NoError(t, err)
require.Equal(t, 3, count)
}

View file

@ -37,21 +37,25 @@ func (r ListContainersRes) Containers() []cid.ID {
// IterateOverContainersPrm contains parameters for IterateOverContainers operation. // IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct { type IterateOverContainersPrm struct {
// Handler function executed upon containers in db. // Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error Handler func(context.Context, objectSDK.Type, cid.ID) error
} }
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. // IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct { type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name. // ObjectType type of objects to iterate over.
BucketName []byte ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db. // Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error Handler func(context.Context, *objectcore.Info) error
} }
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation. // CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct { type CountAliveObjectsInContainerPrm struct {
// BucketName container's bucket name. // ObjectType type of objects to iterate over.
BucketName []byte ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
} }
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
@ -226,7 +230,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
} }
var metaPrm meta.IterateOverObjectsInContainerPrm var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil { if err != nil {
@ -236,8 +241,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return nil return nil
} }
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. // CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket") _, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
defer span.End() defer span.End()
@ -248,9 +253,10 @@ func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObj
return 0, ErrDegradedMode return 0, ErrDegradedMode
} }
var metaPrm meta.CountAliveObjectsInBucketPrm var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName metaPrm.ObjectType = prm.ObjectType
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm) metaPrm.ContainerID = prm.ContainerID
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
if err != nil { if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err) return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
} }