From 26a007a0174d77452f767e35e3da7885beacd5ef Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 3 Sep 2024 15:42:38 +0300 Subject: [PATCH] [#xx] node: Add ability to evacuate objects from `REP 1` only Signed-off-by: Anton Nikiforov --- .../modules/control/evacuate_shard.go | 1 + cmd/frostfs-cli/modules/control/evacuation.go | 4 + pkg/local_object_storage/engine/evacuate.go | 27 +++++- .../engine/evacuate_test.go | 82 ++++++++++++++++++ pkg/services/control/server/evacuate.go | 1 + pkg/services/control/server/evacuate_async.go | 1 + pkg/services/control/service.proto | 5 ++ pkg/services/control/service_frostfs.pb.go | Bin 381845 -> 383201 bytes 8 files changed, 120 insertions(+), 1 deletion(-) diff --git a/cmd/frostfs-cli/modules/control/evacuate_shard.go b/cmd/frostfs-cli/modules/control/evacuate_shard.go index 458e4cc0b..7f5f7e6b7 100644 --- a/cmd/frostfs-cli/modules/control/evacuate_shard.go +++ b/cmd/frostfs-cli/modules/control/evacuate_shard.go @@ -24,6 +24,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) { req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} req.Body.Shard_ID = getShardIDList(cmd) req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag) + req.Body.RepOneOnly, _ = cmd.Flags().GetBool(repOneOnlyFlag) signRequest(cmd, pk, req) diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 6fa5ed75c..4c7390508 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -20,6 +20,7 @@ const ( awaitFlag = "await" noProgressFlag = "no-progress" scopeFlag = "scope" + repOneOnlyFlag = "rep-one-only" scopeAll = "all" scopeObjects = "objects" @@ -64,12 +65,14 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ Shard_ID: getShardIDList(cmd), IgnoreErrors: ignoreErrors, Scope: getEvacuationScope(cmd), + RepOneOnly: repOneOnly, }, } @@ -371,6 +374,7 @@ func initControlStartEvacuationShardCmd() { flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy `REP 1 ...`") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 01ade875e..8b9d858ad 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -79,6 +79,7 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + RepOneOnly bool } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -270,6 +271,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), + attribute.Bool("repOneOnly", prm.RepOneOnly), )) defer func() { @@ -653,7 +655,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to default: } addr := toEvacuate[i].Address - + if prm.RepOneOnly { + repOne, err := e.isRepOne(addr.Container()) + if err != nil { + return err + } + if !repOne { + res.objSkipped.Add(1) + continue + } + } var getPrm shard.GetPrm getPrm.SetAddress(addr) getPrm.SkipEvacCheck(true) @@ -703,6 +714,20 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return nil } +func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) { + c, err := e.containerSource.Load().cs.Get(cid) + if err != nil { + return false, err + } + p := c.Value.PlacementPolicy() + for i := 0; i < p.NumberOfReplicas(); i++ { + if p.ReplicaDescriptor(i).NumberOfObjects() == 1 { + return true, nil + } + } + return false, nil +} + func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index f8d0ffd6b..c863670e3 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" @@ -18,14 +19,31 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +type containerStorage struct { + cntmap map[cid.ID]*container.Container +} + +func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { + coreCnt := coreContainer.Container{ + Value: *cs.cntmap[id], + } + return &coreCnt, nil +} + +func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) { + return nil, nil +} + func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() @@ -605,3 +623,67 @@ func TestEvacuateTreesRemote(t *testing.T) { require.Equal(t, expectedTreeOps, evacuatedTreeOps) } + +func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + // Create container with policy REP 2 + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + + // Create container with policy REP 1 + cnr2 := container.Container{} + p2 := netmap.PlacementPolicy{} + p2.SetContainerBackupFactor(1) + x2 := netmap.ReplicaDescriptor{} + x2.SetNumberOfObjects(1) + p2.AddReplicas(x2) + cnr2.SetPlacementPolicy(p2) + + var idCnr2 cid.ID + container.CalculateID(&idCnr2, cnr2) + cnrmap[idCnr2] = &cnr2 + cids = append(cids, idCnr2) + + e.SetContainerSource(&containerStorage{cntmap: cnrmap}) + + for _, sh := range ids { + for i := 0; i < 4; i++ { + obj := testutil.GenerateObjectWithCID(cids[i%2]) + + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + res, err := e.Evacuate(context.Background(), prm) + require.NoError(t, err) + require.Equal(t, uint64(2), res.ObjectsEvacuated()) + require.Equal(t, uint64(2), res.ObjectsSkipped()) + require.Equal(t, uint64(0), res.ObjectsFailed()) +} diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 0ba8be765..b644b9544 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -37,6 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe IgnoreErrors: req.GetBody().GetIgnoreErrors(), ObjectsHandler: s.replicateObject, Scope: engine.EvacuateScopeObjects, + RepOneOnly: req.GetBody().GetRepOneOnly(), } res, err := s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index b829573ec..fe36128e4 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -29,6 +29,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha TreeHandler: s.replicateTree, Async: true, Scope: engine.EvacuateScope(req.GetBody().GetScope()), + RepOneOnly: req.GetBody().GetRepOneOnly(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index d6639cb48..390b068bc 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -313,6 +313,9 @@ message EvacuateShardRequest { // Flag indicating whether object read errors should be ignored. bool ignore_errors = 2; + + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 3; } Body body = 1; @@ -391,6 +394,8 @@ message StartShardEvacuationRequest { bool ignore_errors = 2; // Evacuation scope. uint32 scope = 3; + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 4; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index eb0d95c64eaadde08fddad2b0e904365f0522c35..626c412d0714cc0a73f0379ae25dd29e4b6e6c81 100644 GIT binary patch delta 1082 zcmb7?Ur1AN6vsKgd$03u>TWn)SZ=qap=71>P^i%iOKj;Rdx#*L#R5az{DXf`anoQ= zjr3OEhoD3sq7bwQt`*{xKWGL-s zS#ZB3*?P1@?LBO$Xr%h!eMdF~Uy}*31u|*zk!Og$qECw4a(&BXlvGe&`i23m%yntA zRDru(oP)BQRFd@A9JLUpm|!tHYj3k{P|irT&J7J$v00Yo!L>)KD^x9t2apvfE4Y6P z)EJ2f{-(y1-10pAL|2RoGHd)@Mnfp&DRWBfi&2Hkh>N-tV=Gqs&0Bt{FT`k?AzCLn zO;bAOiv^*nvwrTtVH}XodBm@2=@iaiCdF=`bt=^VqD}MFHf@EWT-4KN5m6h O{0RJrx+fx?J@*G1k9GS1 delta 459 zcmaF3QGDuh@rEso;yWg%aBFSP*}*8oG?_&qcDlh$Mv>_aI~duf|J%#7Yx;x>Q10Y) zEe6vMxUtThY^D^7r0T#iMy}}}<}mM_zWx$KDeLqc2Nvz^akWf#jFV?Ak(u6mm{EE< z4--=ugjL1FWC~$L`m)AASbmL629q^xoRAC#TF){4gA+^TcA);#jMJ~*V9bR$3}oeG z0l)m|`(SR}4iu_onr=IrF>rdp9Y+4?340k?w|_s$n7?Lv=u$@Q>F2wdq^3`(Vb-7i zZXT1!^n_Yw!Rh`MKnoPi8KtJz&u0>aiz!Y|Sji*?;?A2cunMU7!9-@B>Hp`mW=ua{ z!4x=|^|jFS19KUDru#QDX-t05ylMKsiA>Vd6DpY4C$DeO2Zho6>4nuydfTs6GX*nG z&w0Wmv;BTOQw-DelN%YGw*$4EWdivoa=KqLlhXA0hnOm+^KJoJ39@K=y(^15<93HI mR%^!TqFqd$)8l3^2Tu=xx|+3JzMBb%nSq#PyL>n6AzuK@Qm`Zd