node: Add ability to evacuate objects from REP 1 only #1350

Merged
acid-ant merged 1 commit from acid-ant/frostfs-node:feat/evac-skip-rep-one into master 2024-09-30 06:34:32 +00:00
9 changed files with 309 additions and 6 deletions

View file

@ -20,6 +20,7 @@ const (
awaitFlag = "await" awaitFlag = "await"
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope" scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
containerWorkerCountFlag = "container-worker-count" containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count" objectWorkerCountFlag = "object-worker-count"
@ -69,6 +70,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
req := &control.StartShardEvacuationRequest{ req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{ Body: &control.StartShardEvacuationRequest_Body{
@ -77,6 +79,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
Scope: getEvacuationScope(cmd), Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount, ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount, ObjectWorkerCount: objectWorkerCount,
RepOneOnly: repOneOnly,
}, },
} }
@ -380,6 +383,7 @@ func initControlStartEvacuationShardCmd() {
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -20,7 +20,12 @@ Because it is necessary to prevent removing by policer objects with policy `REP
## Commands ## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). `frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
To adjust resource consumption required for evacuation use options:
- `--container-worker-count` count of concurrent container evacuation workers
- `--object-worker-count` count of concurrent object evacuation workers
`frostfs-cli control shards evacuation stop` stops running evacuation process. `frostfs-cli control shards evacuation stop` stops running evacuation process.

View file

@ -9,6 +9,7 @@ import (
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -16,6 +17,7 @@ import (
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -88,6 +90,7 @@ type EvacuateShardPrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool Async bool
Scope EvacuateScope Scope EvacuateScope
RepOneOnly bool
ContainerWorkerCount uint32 ContainerWorkerCount uint32
ObjectWorkerCount uint32 ObjectWorkerCount uint32
@ -288,6 +291,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.Bool("async", prm.Async), attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
)) ))
defer func() { defer func() {
@ -430,13 +434,34 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
) error { ) error {
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
var cntPrm shard.IterateOverContainersPrm var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return context.Cause(ctx) return context.Cause(ctx)
default: default:
} }
egContainer.Go(func() error { egContainer.Go(func() error {
var skip bool
c, err := e.containerSource.Load().cs.Get(cnt)
if err != nil {
if client.IsErrContainerNotFound(err) {
skip = true
} else {
return err
}
}
if !skip && prm.RepOneOnly {
skip = e.isNotRepOne(c)
}
if skip {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
if err != nil {
return err
}
res.objSkipped.Add(count)
return nil
}
var objPrm shard.IterateOverObjectsInContainerPrm var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
@ -454,7 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
}) })
return nil return nil
} }
err := sh.IterateOverObjectsInContainer(ctx, objPrm) err = sh.IterateOverObjectsInContainer(ctx, objPrm)
if err != nil { if err != nil {
cancel(err) cancel(err)
} }
@ -781,6 +806,16 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
return nil return nil
} }
aarifullin marked this conversation as resolved Outdated

You'd had already the discussion with @dstepanov-yadro.
I believe your PR solves the problem for evacuation of objects that have single instance across all nodes. But if the placement policy has a few vectors (REP 1 ... REP 1) it seems like it has replicated copy. Could you explain, then, why do you NOT check if p.NumberOfReplicas() == 1?

You'd had already the [discussion](https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1350#issuecomment-50772) with @dstepanov-yadro. I believe your PR solves the problem for evacuation of objects that have single instance across all nodes. But if the placement policy has a few vectors (`REP 1 ... REP 1`) it seems like it has replicated copy. Could you explain, then, why do you NOT check if `p.NumberOfReplicas() == 1`?

I have only one argument for this - network connectivity between nodes for two REP 1 may be bad. I see that it is not strong and because you don't see any problems I'll add this check.

I have only one argument for this - network connectivity between nodes for two `REP 1` may be bad. I see that it is not strong and because you don't see any problems I'll add this check.
func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() > 1 {
return true
}
}
return false
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
) (bool, error) { ) (bool, error) {

View file

@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -20,14 +21,38 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type containerStorage struct {
cntmap map[cid.ID]*container.Container
latency time.Duration
}
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
time.Sleep(cs.latency)
v, ok := cs.cntmap[id]
if !ok {
return nil, new(apistatus.ContainerNotFound)
}
coreCnt := coreContainer.Container{
Value: *v,
}
return &coreCnt, nil
}
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
return nil, nil
}
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir() dir := t.TempDir()
@ -61,10 +86,15 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
{Key: pilorama.AttributeVersion, Value: []byte("XXX")}, {Key: pilorama.AttributeVersion, Value: []byte("XXX")},
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")}, {Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
} }
cnrMap := make(map[cid.ID]*container.Container)
for _, sh := range ids { for _, sh := range ids {
for range objPerShard { for i := range objPerShard {
// Create dummy container
cnr1 := container.Container{}
cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i))
contID := cidtest.ID() contID := cidtest.ID()
cnrMap[contID] = &cnr1
obj := testutil.GenerateObjectWithCID(contID) obj := testutil.GenerateObjectWithCID(contID)
objects = append(objects, obj) objects = append(objects, obj)
@ -78,6 +108,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, err) require.NoError(t, err)
} }
} }
e.SetContainerSource(&containerStorage{cntmap: cnrMap})
return e, ids, objects return e, ids, objects
} }
@ -177,7 +208,10 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n atomic.Uint64 var n atomic.Uint64
var mtx sync.Mutex
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
mtx.Lock()
defer mtx.Unlock()
if n.Load() == max { if n.Load() == max {
return false, errReplication return false, errReplication
} }
@ -640,3 +674,146 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.Equal(t, expectedTreeOps, evacuatedTreeOps) require.Equal(t, expectedTreeOps, evacuatedTreeOps)
} }
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
// Create container with policy REP 2
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
x1 = netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(1)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
cnr1.SetAttribute("cnr", "cnr1")
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
// Create container with policy REP 1
cnr2 := container.Container{}
p2 := netmap.PlacementPolicy{}
p2.SetContainerBackupFactor(1)
x2 := netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
x2 = netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
cnr2.SetPlacementPolicy(p2)
cnr2.SetAttribute("cnr", "cnr2")
var idCnr2 cid.ID
container.CalculateID(&idCnr2, cnr2)
cnrmap[idCnr2] = &cnr2
cids = append(cids, idCnr2)
// Create container for simulate removing
cnr3 := container.Container{}
p3 := netmap.PlacementPolicy{}
p3.SetContainerBackupFactor(1)
x3 := netmap.ReplicaDescriptor{}
x3.SetNumberOfObjects(1)
p3.AddReplicas(x3)
cnr3.SetPlacementPolicy(p3)
cnr3.SetAttribute("cnr", "cnr3")
var idCnr3 cid.ID
container.CalculateID(&idCnr3, cnr3)
cids = append(cids, idCnr3)
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids {
for j := range 3 {
for range 4 {
obj := testutil.GenerateObjectWithCID(cids[j])
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(8), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
}
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
t.Skip()
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
// Create containers with policy REP 1
for i := range 10_000 {
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
cnr1.SetAttribute("i", strconv.Itoa(i))
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
}
e.SetContainerSource(&containerStorage{
cntmap: cnrmap,
latency: time.Millisecond * 100,
})
for _, cnt := range cids {
for range 1 {
obj := testutil.GenerateObjectWithCID(cnt)
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[ids[0].String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
prm.ContainerWorkerCount = 10
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}

View file

@ -76,6 +76,12 @@ type IterateOverObjectsInContainerPrm struct {
Handler func(context.Context, *objectcore.Info) error Handler func(context.Context, *objectcore.Info) error
} }
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name.
BucketName []byte
}
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects. // cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests. // Use cursor value from response for consecutive requests.
@ -426,3 +432,48 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
} }
return nil return nil
} }
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}
cidRaw := prm.BucketName[1:bucketKeySize]
if cidRaw == nil {
return 0, nil
}
var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName)
if bkt == nil {
return nil
}
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, _ := c.First()
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++
}
return nil
})
success = err == nil
return count, metaerr.Wrap(err)
}

View file

@ -44,10 +44,16 @@ type IterateOverContainersPrm struct {
type IterateOverObjectsInContainerPrm struct { type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name. // BucketName container's bucket name.
BucketName []byte BucketName []byte
// Handler function executed upon containers in db. // Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error Handler func(context.Context, *objectcore.Info) error
} }
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name.
BucketName []byte
}
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
@ -229,3 +235,25 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
return nil return nil
} }
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInBucketPrm
metaPrm.BucketName = prm.BucketName
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
}
return count, nil
}

View file

@ -31,6 +31,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
RepOneOnly: req.GetBody().GetRepOneOnly(),
} }
_, err = s.s.Evacuate(ctx, prm) _, err = s.s.Evacuate(ctx, prm)

View file

@ -398,6 +398,8 @@ message StartShardEvacuationRequest {
uint32 container_worker_count = 4; uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers. // Count of concurrent object evacuation workers.
uint32 object_worker_count = 5; uint32 object_worker_count = 5;
// Choose for evacuation objects in `REP 1` containers only.
bool rep_one_only = 6;
} }
Body body = 1; Body body = 1;

Binary file not shown.