[#1423] metabase: Hide BucketName form upper levels
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m11s
DCO action / DCO (pull_request) Successful in 2m28s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m17s
Vulncheck / Vulncheck (pull_request) Successful in 4m25s
Tests and linters / Lint (pull_request) Successful in 4m37s
Tests and linters / gopls check (pull_request) Successful in 4m34s
Tests and linters / Tests with -race (pull_request) Successful in 4m37s
Build / Build Components (pull_request) Successful in 4m49s
Tests and linters / Staticcheck (pull_request) Successful in 6m1s
Tests and linters / Tests (pull_request) Successful in 7m12s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-10-08 18:39:52 +03:00
parent c065d55ca3
commit 936ebbb8e5
4 changed files with 143 additions and 62 deletions

View file

@ -435,7 +435,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
) error {
sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
cntPrm.Handler = func(ctx context.Context, objType objectSDK.Type, cnt cid.ID) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
@ -455,8 +455,11 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
skip = e.isNotRepOne(c)
}
if skip {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
countPrm := shard.CountAliveObjectsInContainerPrm{
ObjectType: objType,
ContainerID: cnt,
}
count, err := sh.CountAliveObjectsInContainer(ctx, countPrm)
if err != nil {
return err
}
@ -464,7 +467,8 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
return nil
}
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.ObjectType = objType
objPrm.ContainerID = cnt
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
select {
case <-ctx.Done():

View file

@ -65,21 +65,25 @@ func (l ListRes) Cursor() *Cursor {
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
Handler func(context.Context, objectSDK.Type, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name.
BucketName []byte
// CountAliveObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
}
// ListWithCursor lists physical objects available in metabase starting from
@ -319,12 +323,20 @@ func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm Itera
if cidRaw == nil {
continue
}
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
err := prm.Handler(ctx, bktName, cnt)
var objType objectSDK.Type
switch prefix[0] {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
continue
}
err := prm.Handler(ctx, objType, cnt)
if err != nil {
return err
}
@ -356,22 +368,29 @@ func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOver
return ErrDegradedMode
}
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
return db.iterateOverObjectsInContainer(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
) error {
bkt := tx.Bucket(prm.BucketName)
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
var prefix byte
switch prm.ObjectType {
case objectSDK.TypeRegular:
prefix = primaryPrefix
case objectSDK.TypeLock:
prefix = lockersPrefix
case objectSDK.TypeTombstone:
prefix = tombstonePrefix
default:
return nil
}
bucketName := []byte{prefix}
bucketName = append(bucketName, prm.ContainerID[:]...)
bkt := tx.Bucket(bucketName)
if bkt == nil {
return nil
}
@ -380,32 +399,19 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
c := bkt.Cursor()
k, v := c.First()
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
if prm.ObjectType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
@ -422,9 +428,9 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
}
var a oid.Address
a.SetContainer(containerID)
a.SetContainer(prm.ContainerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
objInfo := objectcore.Info{Address: a, Type: prm.ObjectType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
@ -433,8 +439,8 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
var (
startedAt = time.Now()
success = false
@ -452,14 +458,22 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
return 0, ErrDegradedMode
}
if len(prm.BucketName) != bucketKeySize {
var prefix byte
switch prm.ObjectType {
case objectSDK.TypeRegular:
prefix = primaryPrefix
case objectSDK.TypeLock:
prefix = lockersPrefix
case objectSDK.TypeTombstone:
prefix = tombstonePrefix
default:
return 0, nil
}
cidRaw := prm.BucketName[1:bucketKeySize]
bucketName := []byte{prefix}
bucketName = append(bucketName, prm.ContainerID[:]...)
var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName)
bkt := tx.Bucket(bucketName)
if bkt == nil {
return nil
}
@ -468,7 +482,7 @@ func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjec
c := bkt.Cursor()
k, _ := c.First()
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
if inGraveyardWithKey(append(prm.ContainerID[:], k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
@ -219,3 +220,59 @@ func metaListWithCursor(db *meta.DB, count uint32, cursor *meta.Cursor) ([]objec
r, err := db.ListWithCursor(context.Background(), listPrm)
return r.AddressList(), r.Cursor(), err
}
func TestIterateOver(t *testing.T) {
t.Parallel()
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
const total uint64 = 5
for _, typ := range []objectSDK.Type{objectSDK.TypeRegular, objectSDK.TypeTombstone, objectSDK.TypeLock} {
var expected []*objectSDK.Object
// fill metabase with objects
cid := cidtest.ID()
for range total {
obj := testutil.GenerateObjectWithCID(cid)
obj.SetType(typ)
err := metaPut(db, obj, nil)
require.NoError(t, err)
expected = append(expected, obj)
}
var metaIter meta.IterateOverObjectsInContainerPrm
var count uint64
metaIter.Handler = func(context.Context, *object.Info) error {
count++
return nil
}
metaIter.ContainerID = cid
metaIter.ObjectType = typ
err := db.IterateOverObjectsInContainer(context.Background(), metaIter)
require.NoError(t, err)
require.Equal(t, total, count)
var metaCount meta.CountAliveObjectsInContainerPrm
metaCount.ContainerID = cid
metaCount.ObjectType = typ
res, err := db.CountAliveObjectsInContainer(context.Background(), metaCount)
require.NoError(t, err)
require.Equal(t, res, total)
err = metaDelete(db, object.AddressOf(expected[0]), object.AddressOf(expected[1]))
require.NoError(t, err)
res, err = db.CountAliveObjectsInContainer(context.Background(), metaCount)
require.NoError(t, err)
require.Equal(t, uint64(3), res)
}
var count int
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = func(context.Context, objectSDK.Type, cidSDK.ID) error {
count++
return nil
}
err := db.IterateOverContainers(context.Background(), metaPrm)
require.NoError(t, err)
require.Equal(t, 3, count)
}

View file

@ -37,21 +37,25 @@ func (r ListContainersRes) Containers() []cid.ID {
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
Handler func(context.Context, objectSDK.Type, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name.
BucketName []byte
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name.
BucketName []byte
// CountAliveObjectsInContainerPrm contains parameters for CountAliveObjectsInContainer operation.
type CountAliveObjectsInContainerPrm struct {
// ObjectType type of objects to iterate over.
ObjectType objectSDK.Type
// ContainerID container for objects to iterate over.
ContainerID cid.ID
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
@ -226,7 +230,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName
metaPrm.ContainerID = prm.ContainerID
metaPrm.ObjectType = prm.ObjectType
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
@ -236,8 +241,8 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
// CountAliveObjectsInContainer count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInContainer(ctx context.Context, prm CountAliveObjectsInContainerPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
defer span.End()
@ -248,9 +253,10 @@ func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObj
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInBucketPrm
metaPrm.BucketName = prm.BucketName
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
var metaPrm meta.CountAliveObjectsInContainerPrm
metaPrm.ObjectType = prm.ObjectType
metaPrm.ContainerID = prm.ContainerID
count, err := s.metaBase.CountAliveObjectsInContainer(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
}