diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 04a67e5b5..fffc5e33e 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -20,6 +20,7 @@ const ( awaitFlag = "await" noProgressFlag = "no-progress" scopeFlag = "scope" + repOneOnlyFlag = "rep-one-only" containerWorkerCountFlag = "container-worker-count" objectWorkerCountFlag = "object-worker-count" @@ -69,6 +70,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag) objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag) + repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ @@ -77,6 +79,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { Scope: getEvacuationScope(cmd), ContainerWorkerCount: containerWorkerCount, ObjectWorkerCount: objectWorkerCount, + RepOneOnly: repOneOnly, }, } @@ -380,6 +383,7 @@ func initControlStartEvacuationShardCmd() { flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers") flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers") + flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/docs/evacuation.md b/docs/evacuation.md index 885ce169a..d47d56d15 100644 --- a/docs/evacuation.md +++ b/docs/evacuation.md @@ -20,7 +20,12 @@ Because it is necessary to prevent removing by policer objects with policy `REP ## Commands -`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. +By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`. +To adjust resource consumption required for evacuation use options: + - `--container-worker-count` count of concurrent container evacuation workers + - `--object-worker-count` count of concurrent object evacuation workers `frostfs-cli control shards evacuation stop` stops running evacuation process. diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 3db556a8f..a618ff274 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" @@ -16,6 +17,7 @@ import ( tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -88,6 +90,7 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + RepOneOnly bool ContainerWorkerCount uint32 ObjectWorkerCount uint32 @@ -288,6 +291,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), + attribute.Bool("repOneOnly", prm.RepOneOnly), )) defer func() { @@ -430,13 +434,34 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context ) error { sh := shardsToEvacuate[shardID] var cntPrm shard.IterateOverContainersPrm - cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error { select { case <-ctx.Done(): return context.Cause(ctx) default: } egContainer.Go(func() error { + var skip bool + c, err := e.containerSource.Load().cs.Get(cnt) + if err != nil { + if client.IsErrContainerNotFound(err) { + skip = true + } else { + return err + } + } + if !skip && prm.RepOneOnly { + skip = e.isNotRepOne(c) + } + if skip { + countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name} + count, err := sh.CountAliveObjectsInBucket(ctx, countPrm) + if err != nil { + return err + } + res.objSkipped.Add(count) + return nil + } var objPrm shard.IterateOverObjectsInContainerPrm objPrm.BucketName = name objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { @@ -454,7 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context }) return nil } - err := sh.IterateOverObjectsInContainer(ctx, objPrm) + err = sh.IterateOverObjectsInContainer(ctx, objPrm) if err != nil { cancel(err) } @@ -781,6 +806,16 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI return nil } +func (e *StorageEngine) isNotRepOne(c *container.Container) bool { + p := c.Value.PlacementPolicy() + for i := range p.NumberOfReplicas() { + if p.ReplicaDescriptor(i).NumberOfObjects() > 1 { + return true + } + } + return false +} + func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index f72333399..8498c9245 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" @@ -20,14 +21,38 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +type containerStorage struct { + cntmap map[cid.ID]*container.Container + latency time.Duration +} + +func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { + time.Sleep(cs.latency) + v, ok := cs.cntmap[id] + if !ok { + return nil, new(apistatus.ContainerNotFound) + } + coreCnt := coreContainer.Container{ + Value: *v, + } + return &coreCnt, nil +} + +func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) { + return nil, nil +} + func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() @@ -61,10 +86,15 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng {Key: pilorama.AttributeVersion, Value: []byte("XXX")}, {Key: pilorama.AttributeFilename, Value: []byte("file.txt")}, } - + cnrMap := make(map[cid.ID]*container.Container) for _, sh := range ids { - for range objPerShard { + for i := range objPerShard { + // Create dummy container + cnr1 := container.Container{} + cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i)) contID := cidtest.ID() + cnrMap[contID] = &cnr1 + obj := testutil.GenerateObjectWithCID(contID) objects = append(objects, obj) @@ -78,6 +108,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng require.NoError(t, err) } } + e.SetContainerSource(&containerStorage{cntmap: cnrMap}) return e, ids, objects } @@ -177,7 +208,10 @@ func TestEvacuateObjectsNetwork(t *testing.T) { acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { var n atomic.Uint64 + var mtx sync.Mutex return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { + mtx.Lock() + defer mtx.Unlock() if n.Load() == max { return false, errReplication } @@ -640,3 +674,146 @@ func TestEvacuateTreesRemote(t *testing.T) { require.Equal(t, expectedTreeOps, evacuatedTreeOps) } + +func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + // Create container with policy REP 2 + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + x1 = netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(1) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + cnr1.SetAttribute("cnr", "cnr1") + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + + // Create container with policy REP 1 + cnr2 := container.Container{} + p2 := netmap.PlacementPolicy{} + p2.SetContainerBackupFactor(1) + x2 := netmap.ReplicaDescriptor{} + x2.SetNumberOfObjects(1) + p2.AddReplicas(x2) + x2 = netmap.ReplicaDescriptor{} + x2.SetNumberOfObjects(1) + p2.AddReplicas(x2) + cnr2.SetPlacementPolicy(p2) + cnr2.SetAttribute("cnr", "cnr2") + + var idCnr2 cid.ID + container.CalculateID(&idCnr2, cnr2) + cnrmap[idCnr2] = &cnr2 + cids = append(cids, idCnr2) + + // Create container for simulate removing + cnr3 := container.Container{} + p3 := netmap.PlacementPolicy{} + p3.SetContainerBackupFactor(1) + x3 := netmap.ReplicaDescriptor{} + x3.SetNumberOfObjects(1) + p3.AddReplicas(x3) + cnr3.SetPlacementPolicy(p3) + cnr3.SetAttribute("cnr", "cnr3") + + var idCnr3 cid.ID + container.CalculateID(&idCnr3, cnr3) + cids = append(cids, idCnr3) + + e.SetContainerSource(&containerStorage{cntmap: cnrmap}) + + for _, sh := range ids { + for j := range 3 { + for range 4 { + obj := testutil.GenerateObjectWithCID(cids[j]) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + res, err := e.Evacuate(context.Background(), prm) + require.NoError(t, err) + require.Equal(t, uint64(4), res.ObjectsEvacuated()) + require.Equal(t, uint64(8), res.ObjectsSkipped()) + require.Equal(t, uint64(0), res.ObjectsFailed()) +} + +func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) { + t.Skip() + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + // Create containers with policy REP 1 + for i := range 10_000 { + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + cnr1.SetAttribute("i", strconv.Itoa(i)) + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + } + + e.SetContainerSource(&containerStorage{ + cntmap: cnrmap, + latency: time.Millisecond * 100, + }) + + for _, cnt := range cids { + for range 1 { + obj := testutil.GenerateObjectWithCID(cnt) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[ids[0].String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + prm.ContainerWorkerCount = 10 + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + start := time.Now() + _, err := e.Evacuate(context.Background(), prm) + t.Logf("evacuate took %v\n", time.Since(start)) + require.NoError(t, err) +} diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 5943be7f4..44f25246e 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -76,6 +76,12 @@ type IterateOverObjectsInContainerPrm struct { Handler func(context.Context, *objectcore.Info) error } +// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation. +type CountAliveObjectsInBucketPrm struct { + // BucketName container's bucket name. + BucketName []byte +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -426,3 +432,48 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c } return nil } + +// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return 0, ErrDegradedMode + } + + cidRaw := prm.BucketName[1:bucketKeySize] + if cidRaw == nil { + return 0, nil + } + var count uint64 + err := db.boltDB.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(prm.BucketName) + if bkt == nil { + return nil + } + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, _ := c.First() + for ; k != nil; k, _ = c.Next() { + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + count++ + } + return nil + }) + success = err == nil + return count, metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 9f56ec750..f5d633b77 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -44,10 +44,16 @@ type IterateOverContainersPrm struct { type IterateOverObjectsInContainerPrm struct { // BucketName container's bucket name. BucketName []byte - // Handler function executed upon containers in db. + // Handler function executed upon objects in db. Handler func(context.Context, *objectcore.Info) error } +// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation. +type CountAliveObjectsInBucketPrm struct { + // BucketName container's bucket name. + BucketName []byte +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -229,3 +235,25 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv return nil } + +// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) { + _, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket") + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return 0, ErrDegradedMode + } + + var metaPrm meta.CountAliveObjectsInBucketPrm + metaPrm.BucketName = prm.BucketName + count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm) + if err != nil { + return 0, fmt.Errorf("could not count alive objects in bucket: %w", err) + } + + return count, nil +} diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index bdc6f7c38..146ac7e16 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -31,6 +31,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha Scope: engine.EvacuateScope(req.GetBody().GetScope()), ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(), ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(), + RepOneOnly: req.GetBody().GetRepOneOnly(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 88a06de22..ae1939e13 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -398,6 +398,8 @@ message StartShardEvacuationRequest { uint32 container_worker_count = 4; // Count of concurrent object evacuation workers. uint32 object_worker_count = 5; + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 6; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index e92a8acd1..e16f082b1 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ