metabase: Hide BucketName
form upper levels #1423
4 changed files with 143 additions and 62 deletions
|
@ -435,7 +435,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
|||
) error {
|
||||
sh := shardsToEvacuate[shardID]
|
||||
var cntPrm shard.IterateOverContainersPrm
|
||||
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
|
||||
cntPrm.Handler = func(ctx context.Context, objType objectSDK.Type, cnt cid.ID) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
|
@ -455,8 +455,11 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
|||
skip = e.isNotRepOne(c)
|
||||
}
|
||||
if skip {
|
||||
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
|
||||
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
|
||||
countPrm := shard.CountAliveObjectsInContainerPrm{
|
||||
ObjectType: objType,
|
||||
ContainerID: cnt,
|
||||
}
|
||||
count, err := sh.CountAliveObjectsInContainer(ctx, countPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -464,7 +467,8 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
|||
return nil
|
||||
}
|
||||
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||
objPrm.BucketName = name
|
||||
objPrm.ObjectType = objType
|
||||
objPrm.ContainerID = cnt
|
||||
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -65,21 +65,25 @@ func (l ListRes) Cursor() *Cursor {
|
|||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
Handler func(context.Context, objectSDK.Type, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// ObjectType type of objects to iterate over.
|
||||
ObjectType objectSDK.Type
|
||||
// ContainerID container for objects to iterate over.
|
||||
ContainerID cid.ID
|
||||
// Handler function executed upon objects in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type CountAliveObjectsInBucketPrm struct {
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// CountAliveObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type CountAliveObjectsInContainerPrm struct {
|
||||
// ObjectType type of objects to iterate over.
|
||||
ObjectType objectSDK.Type
|
||||
// ContainerID container for objects to iterate over.
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase starting from
|
||||
|
@ -319,12 +323,20 @@ func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm Itera
|
|||
if cidRaw == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
bktName := make([]byte, len(name))
|
||||
copy(bktName, name)
|
||||
var cnt cid.ID
|
||||
copy(cnt[:], containerID[:])
|
||||
err := prm.Handler(ctx, bktName, cnt)
|
||||
var objType objectSDK.Type
|
||||
switch prefix[0] {
|
||||
case primaryPrefix:
|
||||
objType = objectSDK.TypeRegular
|
||||
case lockersPrefix:
|
||||
objType = objectSDK.TypeLock
|
||||
case tombstonePrefix:
|
||||
objType = objectSDK.TypeTombstone
|
||||
default:
|
||||
continue
|
||||
}
|
||||
err := prm.Handler(ctx, objType, cnt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -356,22 +368,29 @@ func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOver
|
|||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var containerID cid.ID
|
||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
|
||||
if cidRaw == nil {
|
||||
return nil
|
||||
}
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
|
||||
return db.iterateOverObjectsInContainer(ctx, tx, prm)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
|
||||
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
|
||||
) error {
|
||||
bkt := tx.Bucket(prm.BucketName)
|
||||
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
|
||||
var prefix byte
|
||||
switch prm.ObjectType {
|
||||
case objectSDK.TypeRegular:
|
||||
prefix = primaryPrefix
|
||||
case objectSDK.TypeLock:
|
||||
prefix = lockersPrefix
|
||||
case objectSDK.TypeTombstone:
|
||||
prefix = tombstonePrefix
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
bucketName := []byte{prefix}
|
||||
bucketName = append(bucketName, prm.ContainerID[:]...)
|
||||
|
||||
bkt := tx.Bucket(bucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -380,32 +399,19 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
|||
c := bkt.Cursor()
|
||||
k, v := c.First()
|
||||
|
||||
var objType objectSDK.Type
|
||||
|
||||
switch prefix {
|
||||
case primaryPrefix:
|
||||
objType = objectSDK.TypeRegular
|
||||
case lockersPrefix:
|
||||
objType = objectSDK.TypeLock
|
||||
case tombstonePrefix:
|
||||
objType = objectSDK.TypeTombstone
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(k); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
var ecInfo *objectcore.ECInfo
|
||||
if objType == objectSDK.TypeRegular {
|
||||
if prm.ObjectType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
return err
|
||||
|
@ -422,9 +428,9 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
|||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(containerID)
|
||||
a.SetContainer(prm.ContainerID)
|
||||
a.SetObject(obj)
|
||||
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||
objInfo := objectcore.Info{Address: a, Type: prm.ObjectType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||
err := prm.Handler(ctx, &objInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -433,8 +439,8 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
|||
return nil
|
||||
}
|
||||
|
||||
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
||||
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
||||
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
|
||||
func (db *DB) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
|
@ -452,14 +458,22 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
if len(prm.BucketName) != bucketKeySize {
|
||||
var prefix byte
|
||||
switch prm.ObjectType {
|
||||
case objectSDK.TypeRegular:
|
||||
prefix = primaryPrefix
|
||||
case objectSDK.TypeLock:
|
||||
prefix = lockersPrefix
|
||||
case objectSDK.TypeTombstone:
|
||||
prefix = tombstonePrefix
|
||||
default:
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
cidRaw := prm.BucketName[1:bucketKeySize]
|
||||
bucketName := []byte{prefix}
|
||||
bucketName = append(bucketName, prm.ContainerID[:]...)
|
||||
var count uint64
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(prm.BucketName)
|
||||
bkt := tx.Bucket(bucketName)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -468,7 +482,7 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
|
|||
c := bkt.Cursor()
|
||||
k, _ := c.First()
|
||||
for ; k != nil; k, _ = c.Next() {
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
|
@ -219,3 +220,59 @@ func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]objec
|
|||
r, err := db.ListWithCursor(context.Background(), listPrm)
|
||||
return r.AddressList(), r.Cursor(), err
|
||||
}
|
||||
|
||||
func TestIterateOver(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
const total uint64 = 5
|
||||
for _, typ := range []objectSDK.Type{objectSDK.TypeRegular, objectSDK.TypeTombstone, objectSDK.TypeLock} {
|
||||
var expected []*objectSDK.Object
|
||||
// fill metabase with objects
|
||||
cid := cidtest.ID()
|
||||
for range total {
|
||||
obj := testutil.GenerateObjectWithCID(cid)
|
||||
obj.SetType(typ)
|
||||
err := metaPut(db, obj, nil)
|
||||
require.NoError(t, err)
|
||||
expected = append(expected, obj)
|
||||
}
|
||||
|
||||
var metaIter meta.IterateOverObjectsInContainerPrm
|
||||
var count uint64
|
||||
metaIter.Handler = func(context.Context, *object.Info) error {
|
||||
count++
|
||||
return nil
|
||||
}
|
||||
metaIter.ContainerID = cid
|
||||
metaIter.ObjectType = typ
|
||||
err := db.IterateOverObjectsInContainer(context.Background(), metaIter)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, total, count)
|
||||
|
||||
var metaCount meta.CountAliveObjectsInContainerPrm
|
||||
metaCount.ContainerID = cid
|
||||
metaCount.ObjectType = typ
|
||||
res, err := db.CountAliveObjectsInContainer(context.Background(), metaCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, res, total)
|
||||
|
||||
err = metaDelete(db, object.AddressOf(expected[0]), object.AddressOf(expected[1]))
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err = db.CountAliveObjectsInContainer(context.Background(), metaCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(3), res)
|
||||
}
|
||||
var count int
|
||||
var metaPrm meta.IterateOverContainersPrm
|
||||
metaPrm.Handler = func(context.Context, objectSDK.Type, cidSDK.ID) error {
|
||||
count++
|
||||
return nil
|
||||
}
|
||||
err := db.IterateOverContainers(context.Background(), metaPrm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, count)
|
||||
}
|
||||
|
|
|
@ -37,21 +37,25 @@ func (r ListContainersRes) Containers() []cid.ID {
|
|||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
Handler func(context.Context, objectSDK.Type, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// ObjectType type of objects to iterate over.
|
||||
ObjectType objectSDK.Type
|
||||
// ContainerID container for objects to iterate over.
|
||||
ContainerID cid.ID
|
||||
// Handler function executed upon objects in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
|
||||
type CountAliveObjectsInBucketPrm struct {
|
||||
// BucketName container's bucket name.
|
||||
BucketName []byte
|
||||
// CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation.
|
||||
type CountAliveObjectsInContainerPrm struct {
|
||||
// ObjectType type of objects to iterate over.
|
||||
ObjectType objectSDK.Type
|
||||
// ContainerID container for objects to iterate over.
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||
|
@ -226,7 +230,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
|||
}
|
||||
|
||||
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||
metaPrm.BucketName = prm.BucketName
|
||||
metaPrm.ContainerID = prm.ContainerID
|
||||
metaPrm.ObjectType = prm.ObjectType
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
|
@ -236,8 +241,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
|||
return nil
|
||||
}
|
||||
|
||||
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
||||
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
||||
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
|
||||
func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
|
||||
defer span.End()
|
||||
|
||||
|
@ -248,9 +253,10 @@ func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObj
|
|||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
var metaPrm meta.CountAliveObjectsInBucketPrm
|
||||
metaPrm.BucketName = prm.BucketName
|
||||
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
|
||||
var metaPrm meta.CountAliveObjectsInContainerPrm
|
||||
metaPrm.ObjectType = prm.ObjectType
|
||||
metaPrm.ContainerID = prm.ContainerID
|
||||
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue