[#1350] node: Add ability to evacuate objects from REP 1 only

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-03 15:42:38 +03:00
parent 51e64c3101
commit bcc0fed99c
9 changed files with 197 additions and 4 deletions

View file

@ -20,6 +20,7 @@ const (
awaitFlag = "await"
noProgressFlag = "no-progress"
scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count"
@ -69,6 +70,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
@ -77,6 +79,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
RepOneOnly: repOneOnly,
},
}
@ -380,6 +383,7 @@ func initControlStartEvacuationShardCmd() {
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -20,7 +20,9 @@ Because it is necessary to prevent removing by policer objects with policy `REP
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
`frostfs-cli control shards evacuation stop` stops running evacuation process.

View file

@ -79,6 +79,7 @@ type EvacuateShardPrm struct {
IgnoreErrors bool
Async bool
Scope EvacuateScope
RepOneOnly bool
ContainerWorkerCount uint32
ObjectWorkerCount uint32
@ -279,6 +280,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
))
defer func() {
@ -297,14 +299,12 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
}
egShard, _ := errgroup.WithContext(ctx)
egContainer, _ := errgroup.WithContext(ctx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = e.cfg.evacuationContainerWorkerCount
}
egContainer.SetLimit(int(containerWorkerCount))
egObject, _ := errgroup.WithContext(ctx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
@ -409,8 +409,23 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
) error {
sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
egContainer.Go(func() error {
if prm.RepOneOnly {
notRepOne, err := e.isNotRepOne(cnt)
if err != nil {
return err
}
if notRepOne {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
if err != nil {
return err
}
res.objSkipped.Add(count)
return nil
}
}
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
@ -741,6 +756,20 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
return nil
}
func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) {
c, err := e.containerSource.Load().cs.Get(cid)
if err != nil {
return false, err
}
p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() == 1 {
return false, nil
}
}
return true, nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
) (bool, error) {

View file

@ -11,6 +11,7 @@ import (
"testing"
"time"
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -20,14 +21,31 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
type containerStorage struct {
cntmap map[cid.ID]*container.Container
}
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
coreCnt := coreContainer.Container{
Value: *cs.cntmap[id],
}
return &coreCnt, nil
}
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
return nil, nil
}
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir()
@ -611,3 +629,68 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
}
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
// Create container with policy REP 2
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
// Create container with policy REP 1
cnr2 := container.Container{}
p2 := netmap.PlacementPolicy{}
p2.SetContainerBackupFactor(1)
x2 := netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
cnr2.SetPlacementPolicy(p2)
var idCnr2 cid.ID
container.CalculateID(&idCnr2, cnr2)
cnrmap[idCnr2] = &cnr2
cids = append(cids, idCnr2)
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids {
for j := range 2 {
for range 4 {
obj := testutil.GenerateObjectWithCID(cids[j])
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(4), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
}

View file

@ -75,6 +75,12 @@ type IterateOverObjectsInContainerPrm struct {
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name
BucketName []byte
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
@ -427,3 +433,44 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, p
}
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName)
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, _ := c.First()
cidRaw := prm.BucketName[1:bucketKeySize]
if cidRaw == nil {
return nil
}
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++
}
return nil
})
success = err == nil
return count, metaerr.Wrap(err)
}

View file

@ -48,6 +48,12 @@ type IterateOverObjectsInContainerPrm struct {
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name
BucketName []byte
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
@ -223,3 +229,22 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer")
defer span.End()
if s.GetMode().NoMetabase() {
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInBucketPrm
metaPrm.BucketName = prm.BucketName
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
}
return count, nil
}

View file

@ -31,6 +31,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
RepOneOnly: req.GetBody().GetRepOneOnly(),
}
_, err = s.s.Evacuate(ctx, prm)

View file

@ -398,6 +398,8 @@ message StartShardEvacuationRequest {
uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers.
uint32 object_worker_count = 5;
// Choose for evacuation objects in `REP 1` containers only.
bool rep_one_only = 6;
}
Body body = 1;

Binary file not shown.