metabase: Hide BucketName
form upper levels #1423
4 changed files with 143 additions and 62 deletions
|
@ -435,7 +435,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
) error {
|
) error {
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
var cntPrm shard.IterateOverContainersPrm
|
var cntPrm shard.IterateOverContainersPrm
|
||||||
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
|
cntPrm.Handler = func(ctx context.Context, objType objectSDK.Type, cnt cid.ID) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return context.Cause(ctx)
|
return context.Cause(ctx)
|
||||||
|
@ -455,8 +455,11 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
skip = e.isNotRepOne(c)
|
skip = e.isNotRepOne(c)
|
||||||
}
|
}
|
||||||
if skip {
|
if skip {
|
||||||
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
|
countPrm := shard.CountAliveObjectsInContainerPrm{
|
||||||
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
|
ObjectType: objType,
|
||||||
|
ContainerID: cnt,
|
||||||
|
}
|
||||||
|
count, err := sh.CountAliveObjectsInContainer(ctx, countPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -464,7 +467,8 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var objPrm shard.IterateOverObjectsInContainerPrm
|
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||||
objPrm.BucketName = name
|
objPrm.ObjectType = objType
|
||||||
|
objPrm.ContainerID = cnt
|
||||||
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -65,21 +65,25 @@ func (l ListRes) Cursor() *Cursor {
|
||||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||||
type IterateOverContainersPrm struct {
|
type IterateOverContainersPrm struct {
|
||||||
// Handler function executed upon containers in db.
|
// Handler function executed upon containers in db.
|
||||||
Handler func(context.Context, []byte, cid.ID) error
|
Handler func(context.Context, objectSDK.Type, cid.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
type IterateOverObjectsInContainerPrm struct {
|
type IterateOverObjectsInContainerPrm struct {
|
||||||
// BucketName container's bucket name.
|
// ObjectType type of objects to iterate over.
|
||||||
BucketName []byte
|
ObjectType objectSDK.Type
|
||||||
|
// ContainerID container for objects to iterate over.
|
||||||
|
ContainerID cid.ID
|
||||||
// Handler function executed upon objects in db.
|
// Handler function executed upon objects in db.
|
||||||
Handler func(context.Context, *objectcore.Info) error
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
|
// CountAliveObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
type CountAliveObjectsInBucketPrm struct {
|
type CountAliveObjectsInContainerPrm struct {
|
||||||
// BucketName container's bucket name.
|
// ObjectType type of objects to iterate over.
|
||||||
BucketName []byte
|
ObjectType objectSDK.Type
|
||||||
|
// ContainerID container for objects to iterate over.
|
||||||
|
ContainerID cid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
|
@ -319,12 +323,20 @@ func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm Itera
|
||||||
if cidRaw == nil {
|
if cidRaw == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
bktName := make([]byte, len(name))
|
|
||||||
copy(bktName, name)
|
|
||||||
var cnt cid.ID
|
var cnt cid.ID
|
||||||
copy(cnt[:], containerID[:])
|
copy(cnt[:], containerID[:])
|
||||||
err := prm.Handler(ctx, bktName, cnt)
|
var objType objectSDK.Type
|
||||||
|
switch prefix[0] {
|
||||||
|
case primaryPrefix:
|
||||||
|
objType = objectSDK.TypeRegular
|
||||||
|
case lockersPrefix:
|
||||||
|
objType = objectSDK.TypeLock
|
||||||
|
case tombstonePrefix:
|
||||||
|
objType = objectSDK.TypeTombstone
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err := prm.Handler(ctx, objType, cnt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -356,22 +368,29 @@ func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOver
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var containerID cid.ID
|
|
||||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
|
|
||||||
if cidRaw == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
|
return db.iterateOverObjectsInContainer(ctx, tx, prm)
|
||||||
})
|
})
|
||||||
success = err == nil
|
success = err == nil
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
|
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
|
||||||
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
|
var prefix byte
|
||||||
) error {
|
switch prm.ObjectType {
|
||||||
bkt := tx.Bucket(prm.BucketName)
|
case objectSDK.TypeRegular:
|
||||||
|
prefix = primaryPrefix
|
||||||
|
case objectSDK.TypeLock:
|
||||||
|
prefix = lockersPrefix
|
||||||
|
case objectSDK.TypeTombstone:
|
||||||
|
prefix = tombstonePrefix
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bucketName := []byte{prefix}
|
||||||
|
bucketName = append(bucketName, prm.ContainerID[:]...)
|
||||||
|
|
||||||
|
bkt := tx.Bucket(bucketName)
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -380,32 +399,19 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, v := c.First()
|
k, v := c.First()
|
||||||
|
|
||||||
var objType objectSDK.Type
|
|
||||||
|
|
||||||
switch prefix {
|
|
||||||
case primaryPrefix:
|
|
||||||
objType = objectSDK.TypeRegular
|
|
||||||
case lockersPrefix:
|
|
||||||
objType = objectSDK.TypeLock
|
|
||||||
case tombstonePrefix:
|
|
||||||
objType = objectSDK.TypeTombstone
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for ; k != nil; k, v = c.Next() {
|
for ; k != nil; k, v = c.Next() {
|
||||||
var obj oid.ID
|
var obj oid.ID
|
||||||
if err := obj.Decode(k); err != nil {
|
if err := obj.Decode(k); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var isLinkingObj bool
|
var isLinkingObj bool
|
||||||
var ecInfo *objectcore.ECInfo
|
var ecInfo *objectcore.ECInfo
|
||||||
if objType == objectSDK.TypeRegular {
|
if prm.ObjectType == objectSDK.TypeRegular {
|
||||||
var o objectSDK.Object
|
var o objectSDK.Object
|
||||||
if err := o.Unmarshal(v); err != nil {
|
if err := o.Unmarshal(v); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -422,9 +428,9 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
||||||
}
|
}
|
||||||
|
|
||||||
var a oid.Address
|
var a oid.Address
|
||||||
a.SetContainer(containerID)
|
a.SetContainer(prm.ContainerID)
|
||||||
a.SetObject(obj)
|
a.SetObject(obj)
|
||||||
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
objInfo := objectcore.Info{Address: a, Type: prm.ObjectType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||||
err := prm.Handler(ctx, &objInfo)
|
err := prm.Handler(ctx, &objInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -433,8 +439,8 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
|
||||||
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
func (db *DB) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
|
@ -452,14 +458,22 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
|
||||||
return 0, ErrDegradedMode
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(prm.BucketName) != bucketKeySize {
|
var prefix byte
|
||||||
|
switch prm.ObjectType {
|
||||||
|
case objectSDK.TypeRegular:
|
||||||
|
prefix = primaryPrefix
|
||||||
|
case objectSDK.TypeLock:
|
||||||
|
prefix = lockersPrefix
|
||||||
|
case objectSDK.TypeTombstone:
|
||||||
|
prefix = tombstonePrefix
|
||||||
|
default:
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
bucketName := []byte{prefix}
|
||||||
cidRaw := prm.BucketName[1:bucketKeySize]
|
bucketName = append(bucketName, prm.ContainerID[:]...)
|
||||||
var count uint64
|
var count uint64
|
||||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
bkt := tx.Bucket(prm.BucketName)
|
bkt := tx.Bucket(bucketName)
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -468,7 +482,7 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
|
||||||
c := bkt.Cursor()
|
c := bkt.Cursor()
|
||||||
k, _ := c.First()
|
k, _ := c.First()
|
||||||
for ; k != nil; k, _ = c.Next() {
|
for ; k != nil; k, _ = c.Next() {
|
||||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
@ -219,3 +220,59 @@ func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]objec
|
||||||
r, err := db.ListWithCursor(context.Background(), listPrm)
|
r, err := db.ListWithCursor(context.Background(), listPrm)
|
||||||
return r.AddressList(), r.Cursor(), err
|
return r.AddressList(), r.Cursor(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIterateOver(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
db := newDB(t)
|
||||||
|
defer func() { require.NoError(t, db.Close()) }()
|
||||||
|
|
||||||
|
const total uint64 = 5
|
||||||
|
for _, typ := range []objectSDK.Type{objectSDK.TypeRegular, objectSDK.TypeTombstone, objectSDK.TypeLock} {
|
||||||
|
var expected []*objectSDK.Object
|
||||||
|
// fill metabase with objects
|
||||||
|
cid := cidtest.ID()
|
||||||
|
for range total {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cid)
|
||||||
|
obj.SetType(typ)
|
||||||
|
err := metaPut(db, obj, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
expected = append(expected, obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaIter meta.IterateOverObjectsInContainerPrm
|
||||||
|
var count uint64
|
||||||
|
metaIter.Handler = func(context.Context, *object.Info) error {
|
||||||
|
count++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
metaIter.ContainerID = cid
|
||||||
|
metaIter.ObjectType = typ
|
||||||
|
err := db.IterateOverObjectsInContainer(context.Background(), metaIter)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, total, count)
|
||||||
|
|
||||||
|
var metaCount meta.CountAliveObjectsInContainerPrm
|
||||||
|
metaCount.ContainerID = cid
|
||||||
|
metaCount.ObjectType = typ
|
||||||
|
res, err := db.CountAliveObjectsInContainer(context.Background(), metaCount)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, res, total)
|
||||||
|
|
||||||
|
err = metaDelete(db, object.AddressOf(expected[0]), object.AddressOf(expected[1]))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res, err = db.CountAliveObjectsInContainer(context.Background(), metaCount)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(3), res)
|
||||||
|
}
|
||||||
|
var count int
|
||||||
|
var metaPrm meta.IterateOverContainersPrm
|
||||||
|
metaPrm.Handler = func(context.Context, objectSDK.Type, cidSDK.ID) error {
|
||||||
|
count++
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := db.IterateOverContainers(context.Background(), metaPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 3, count)
|
||||||
|
}
|
||||||
|
|
|
@ -37,21 +37,25 @@ func (r ListContainersRes) Containers() []cid.ID {
|
||||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||||
type IterateOverContainersPrm struct {
|
type IterateOverContainersPrm struct {
|
||||||
// Handler function executed upon containers in db.
|
// Handler function executed upon containers in db.
|
||||||
Handler func(context.Context, []byte, cid.ID) error
|
Handler func(context.Context, objectSDK.Type, cid.ID) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
type IterateOverObjectsInContainerPrm struct {
|
type IterateOverObjectsInContainerPrm struct {
|
||||||
// BucketName container's bucket name.
|
// ObjectType type of objects to iterate over.
|
||||||
BucketName []byte
|
ObjectType objectSDK.Type
|
||||||
|
// ContainerID container for objects to iterate over.
|
||||||
|
ContainerID cid.ID
|
||||||
// Handler function executed upon objects in db.
|
// Handler function executed upon objects in db.
|
||||||
Handler func(context.Context, *objectcore.Info) error
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
|
// CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation.
|
||||||
type CountAliveObjectsInBucketPrm struct {
|
type CountAliveObjectsInContainerPrm struct {
|
||||||
// BucketName container's bucket name.
|
// ObjectType type of objects to iterate over.
|
||||||
BucketName []byte
|
ObjectType objectSDK.Type
|
||||||
|
// ContainerID container for objects to iterate over.
|
||||||
|
ContainerID cid.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
|
@ -226,7 +230,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
||||||
}
|
}
|
||||||
|
|
||||||
var metaPrm meta.IterateOverObjectsInContainerPrm
|
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||||
metaPrm.BucketName = prm.BucketName
|
metaPrm.ContainerID = prm.ContainerID
|
||||||
|
metaPrm.ObjectType = prm.ObjectType
|
||||||
metaPrm.Handler = prm.Handler
|
metaPrm.Handler = prm.Handler
|
||||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -236,8 +241,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
|
||||||
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
|
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
@ -248,9 +253,10 @@ func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObj
|
||||||
return 0, ErrDegradedMode
|
return 0, ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
var metaPrm meta.CountAliveObjectsInBucketPrm
|
var metaPrm meta.CountAliveObjectsInContainerPrm
|
||||||
metaPrm.BucketName = prm.BucketName
|
metaPrm.ObjectType = prm.ObjectType
|
||||||
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
|
metaPrm.ContainerID = prm.ContainerID
|
||||||
|
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue