From 94e862988851232a140761fe209c8f12d170efbb Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Tue, 3 Sep 2024 15:42:38 +0300 Subject: [PATCH] [#1350] node: Add ability to evacuate objects from `REP 1` only Signed-off-by: Anton Nikiforov --- .../modules/control/evacuate_shard.go | 1 + cmd/frostfs-cli/modules/control/evacuation.go | 4 + docs/evacuation.md | 4 +- pkg/local_object_storage/engine/evacuate.go | 30 ++++++- .../engine/evacuate_test.go | 83 ++++++++++++++++++ pkg/local_object_storage/metabase/list.go | 41 ++++++++- pkg/local_object_storage/shard/list.go | 18 ++-- pkg/services/control/server/evacuate.go | 1 + pkg/services/control/server/evacuate_async.go | 1 + pkg/services/control/service.proto | 5 ++ pkg/services/control/service_frostfs.pb.go | Bin 404856 -> 406212 bytes 11 files changed, 176 insertions(+), 12 deletions(-) diff --git a/cmd/frostfs-cli/modules/control/evacuate_shard.go b/cmd/frostfs-cli/modules/control/evacuate_shard.go index 458e4cc0b..7f5f7e6b7 100644 --- a/cmd/frostfs-cli/modules/control/evacuate_shard.go +++ b/cmd/frostfs-cli/modules/control/evacuate_shard.go @@ -24,6 +24,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) { req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} req.Body.Shard_ID = getShardIDList(cmd) req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag) + req.Body.RepOneOnly, _ = cmd.Flags().GetBool(repOneOnlyFlag) signRequest(cmd, pk, req) diff --git a/cmd/frostfs-cli/modules/control/evacuation.go b/cmd/frostfs-cli/modules/control/evacuation.go index 6fa5ed75c..eb8878e8c 100644 --- a/cmd/frostfs-cli/modules/control/evacuation.go +++ b/cmd/frostfs-cli/modules/control/evacuation.go @@ -20,6 +20,7 @@ const ( awaitFlag = "await" noProgressFlag = "no-progress" scopeFlag = "scope" + repOneOnlyFlag = "rep-one-only" scopeAll = "all" scopeObjects = "objects" @@ -64,12 +65,14 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag) req := &control.StartShardEvacuationRequest{ Body: &control.StartShardEvacuationRequest_Body{ Shard_ID: getShardIDList(cmd), IgnoreErrors: ignoreErrors, Scope: getEvacuationScope(cmd), + RepOneOnly: repOneOnly, }, } @@ -371,6 +374,7 @@ func initControlStartEvacuationShardCmd() { flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) + flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'") startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) } diff --git a/docs/evacuation.md b/docs/evacuation.md index 885ce169a..322689446 100644 --- a/docs/evacuation.md +++ b/docs/evacuation.md @@ -20,7 +20,9 @@ Because it is necessary to prevent removing by policer objects with policy `REP ## Commands -`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. +By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`). +To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`. `frostfs-cli control shards evacuation stop` stops running evacuation process. diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 8ca2f851c..73c8b13a7 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -79,6 +79,7 @@ type EvacuateShardPrm struct { IgnoreErrors bool Async bool Scope EvacuateScope + RepOneOnly bool } // EvacuateShardRes represents result of the EvacuateShard operation. @@ -268,6 +269,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p attribute.Bool("async", prm.Async), attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Stringer("scope", prm.Scope), + attribute.Bool("repOneOnly", prm.RepOneOnly), )) defer func() { @@ -377,8 +379,17 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string listPrm.Handler = func(ctx context.Context, addrList []object.Info) error { return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate) } - listPrm.ObjectsWorkers = e.cfg.evacuationObjectWorkerCount - listPrm.ContainersWorkers = e.cfg.evacuationContainerWorkerCount + listPrm.ObjectsWorker = e.cfg.evacuationObjectWorkerCount + listPrm.ContainersWorker = e.cfg.evacuationContainerWorkerCount + + if prm.RepOneOnly { + listPrm.ExcludeContainer = func(cid cid.ID) (bool, error) { + return e.isNotRepOne(cid) + } + listPrm.CalcExcluded = func(count uint64) { + res.objSkipped.Add(count) + } + } err := sh.ListConcurrently(ctx, listPrm) if err != nil { @@ -648,7 +659,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to default: } addr := toEvacuate[i].Address - var getPrm shard.GetPrm getPrm.SetAddress(addr) getPrm.SkipEvacCheck(true) @@ -698,6 +708,20 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return nil } +func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) { + c, err := e.containerSource.Load().cs.Get(cid) + if err != nil { + return false, err + } + p := c.Value.PlacementPolicy() + for i := range p.NumberOfReplicas() { + if p.ReplicaDescriptor(i).NumberOfObjects() == 1 { + return false, nil + } + } + return true, nil +} + func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, ) (bool, error) { diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 4509e932d..d86a39532 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" @@ -19,14 +20,31 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) +type containerStorage struct { + cntmap map[cid.ID]*container.Container +} + +func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) { + coreCnt := coreContainer.Container{ + Value: *cs.cntmap[id], + } + return &coreCnt, nil +} + +func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) { + return nil, nil +} + func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() @@ -608,3 +626,68 @@ func TestEvacuateTreesRemote(t *testing.T) { require.Equal(t, expectedTreeOps, evacuatedTreeOps) } + +func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { + e, ids, _ := newEngineEvacuate(t, 2, 0) + defer func() { + require.NoError(t, e.Close(context.Background())) + }() + + // Create container with policy REP 2 + cnr1 := container.Container{} + p1 := netmap.PlacementPolicy{} + p1.SetContainerBackupFactor(1) + x1 := netmap.ReplicaDescriptor{} + x1.SetNumberOfObjects(2) + p1.AddReplicas(x1) + cnr1.SetPlacementPolicy(p1) + + var idCnr1 cid.ID + container.CalculateID(&idCnr1, cnr1) + + cnrmap := make(map[cid.ID]*container.Container) + var cids []cid.ID + cnrmap[idCnr1] = &cnr1 + cids = append(cids, idCnr1) + + // Create container with policy REP 1 + cnr2 := container.Container{} + p2 := netmap.PlacementPolicy{} + p2.SetContainerBackupFactor(1) + x2 := netmap.ReplicaDescriptor{} + x2.SetNumberOfObjects(1) + p2.AddReplicas(x2) + cnr2.SetPlacementPolicy(p2) + + var idCnr2 cid.ID + container.CalculateID(&idCnr2, cnr2) + cnrmap[idCnr2] = &cnr2 + cids = append(cids, idCnr2) + + e.SetContainerSource(&containerStorage{cntmap: cnrmap}) + + for _, sh := range ids { + for j := range 2 { + for range 4 { + obj := testutil.GenerateObjectWithCID(cids[j]) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } + } + } + + var prm EvacuateShardPrm + prm.ShardID = ids[0:1] + prm.Scope = EvacuateScopeObjects + prm.RepOneOnly = true + + require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) + + res, err := e.Evacuate(context.Background(), prm) + require.NoError(t, err) + require.Equal(t, uint64(4), res.ObjectsEvacuated()) + require.Equal(t, uint64(4), res.ObjectsSkipped()) + require.Equal(t, uint64(0), res.ObjectsFailed()) +} diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 9b63f8faa..4626d7b14 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -68,10 +68,14 @@ type ListConcurrentlyPrm struct { Handler func(context.Context, []objectcore.Info) error // BatchSize maximum amount of addresses that will be passed to Handler. BatchSize uint32 - // ContainersWorkers amount of containers computed concurrently. + // ContainersWorker amount of containers computed concurrently. ContainersWorker uint32 - // ObjectsWorkers amount of workers runs Handler concurrently for each container. + // ObjectsWorker amount of workers runs Handler concurrently for each container. ObjectsWorker uint32 + // ExcludeContainer function to check should container be excluded or not. + ExcludeContainer func(cid.ID) (bool, error) + // CalcExcluded count excluded objects + CalcExcluded func(uint642 uint64) } // ListWithCursor lists physical objects available in metabase starting from @@ -259,6 +263,24 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket return to, offset, cursor, nil } +// countAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func countAliveObjectsInBucket(bkt *bbolt.Bucket, // main bucket + graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets + cidRaw []byte, // container ID prefix, optimization +) uint64 { + c := bkt.Cursor() + k, _ := c.First() + var count uint64 + for ; k != nil; k, _ = c.Next() { + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + count++ + } + + return count +} + func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { if len(name) < bucketKeySize { return nil, 0 @@ -343,6 +365,21 @@ func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcur var cnt cid.ID copy(cnt[:], containerID[:]) eg.Go(func() error { + if prm.ExcludeContainer != nil { + exclude, err := prm.ExcludeContainer(cnt) + if err != nil { + return err + } + if exclude { + if prm.CalcExcluded != nil { + buf := make([]byte, cidSize, addressKeySize) + copy(buf, rawAddr) + prm.CalcExcluded( + countAliveObjectsInBucket(bkt, graveyardBkt, garbageBkt, buf)) + } + return nil + } + } return selectConcurrentlyFromBucket(egCtx, bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm) }) diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 4f6076d6e..be468699d 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -41,9 +41,13 @@ type ListConcurrentlyPrm struct { // BatchSize maximum amount of addresses that will be passed to Handler. BatchSize uint32 // ContainersWorkers amount of containers computed concurrently. - ContainersWorkers uint32 + ContainersWorker uint32 // ObjectsWorkers amount of workers runs Handler concurrently - ObjectsWorkers uint32 + ObjectsWorker uint32 + // ExcludeContainer function to check should container be excluded or not. + ExcludeContainer func(cid.ID) (bool, error) + // CalcExcluded count excluded objects + CalcExcluded func(uint642 uint64) } // ListWithCursorPrm contains parameters for ListWithCursor operation. @@ -184,8 +188,8 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e trace.WithAttributes( attribute.Int64("batch_size", int64(prm.BatchSize)), attribute.Bool("has_handler", prm.Handler != nil), - attribute.Int("objects_workers", int(prm.ObjectsWorkers)), - attribute.Int("containers_workers", int(prm.ContainersWorkers)), + attribute.Int("objects_worker", int(prm.ObjectsWorker)), + attribute.Int("containers_worker", int(prm.ContainersWorker)), )) defer span.End() @@ -196,8 +200,10 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e var metaPrm meta.ListConcurrentlyPrm metaPrm.BatchSize = prm.BatchSize metaPrm.Handler = prm.Handler - metaPrm.ContainersWorker = prm.ContainersWorkers - metaPrm.ObjectsWorker = prm.ObjectsWorkers + metaPrm.ContainersWorker = prm.ContainersWorker + metaPrm.ObjectsWorker = prm.ObjectsWorker + metaPrm.ExcludeContainer = prm.ExcludeContainer + metaPrm.CalcExcluded = prm.CalcExcluded err := s.metaBase.ListConcurrently(ctx, metaPrm) if err != nil { return fmt.Errorf("could not list objects concurrently: %w", err) diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index ae3413373..1c18cd886 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -37,6 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe IgnoreErrors: req.GetBody().GetIgnoreErrors(), ObjectsHandler: s.replicateObject, Scope: engine.EvacuateScopeObjects, + RepOneOnly: req.GetBody().GetRepOneOnly(), } res, err := s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/server/evacuate_async.go b/pkg/services/control/server/evacuate_async.go index aacebe9e3..63ddce2f2 100644 --- a/pkg/services/control/server/evacuate_async.go +++ b/pkg/services/control/server/evacuate_async.go @@ -29,6 +29,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha TreeHandler: s.replicateTree, Async: true, Scope: engine.EvacuateScope(req.GetBody().GetScope()), + RepOneOnly: req.GetBody().GetRepOneOnly(), } _, err = s.s.Evacuate(ctx, prm) diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 04994328a..454c4e9ae 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -316,6 +316,9 @@ message EvacuateShardRequest { // Flag indicating whether object read errors should be ignored. bool ignore_errors = 2; + + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 3; } Body body = 1; @@ -394,6 +397,8 @@ message StartShardEvacuationRequest { bool ignore_errors = 2; // Evacuation scope. uint32 scope = 3; + // Choose for evacuation objects in `REP 1` containers only. + bool rep_one_only = 4; } Body body = 1; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 019cac290b7733eb3572ae9d4d24009eab31115b..8fafc62d7e498d1c479e20865637e4cff2314982 100644 GIT binary patch delta 1105 zcmb7CT}V@580P)XzOA#RTewnOy3?{yFjKlH)M!R!B00$}BC=+&!Vouq@TU^D41+F4 zdBW$S3;Th{UZFi9pv#>phddR`#>hCbQE`b%qmMt%upZb}M}i(F?` zTWxR`8%AN9S!Uu&WR&U&Lrkz}n&tOtcG!;b3TKkTm1h=3u`B-2t3ug?VK-7^WQD5V z0x3cU1b$PEt~N79i)f8dPD;72RT~I}YRRm_-UyYrG@RR(SX-WS&AjGHWjsQ|4B-^NRZkcSvGdy8vaQMw=LfPY>H>S1^cW$t)%RYY$Ge=wl$J;qR zt3Y_W!Kzk8(4-^733JIHTOFgU<$dJ|a<`rmLkK=e78Hmu1FPm~gv` zeL;C2%Spg>67j%I*;?+LpePbLjHibE_XEbtz+e1k{intMh-Lc z4pkr;HUo%QRuVru#AA8mjvrT^Q$0M){Z3b060J@*&?$~MUDrhp$>!bd>y~Q?E-bg3 zai#yy5#E2GQ;xXOSy|Z?!_!s0N{>budv4NU41K1}h~!A%#1B!6;5P@lv1rhCw52Hw z?bwUR9BsgHH*dxLRj7WwrnC3Zm!<&TrnnB{7mOfoS6ME!+3Z1N#Hd1ILAcPklj<;P zbIsZ;utu3gB>XFt@08!`y2i`i1=%@>veW3G+WZ&)ka4N_l}QB={uZ)dSkA^!hhk zif#az-{p7M&-ffFX1VsCMm4(MJV+|sFLn`XE^`NtT{C*oGpRjj!{Smyigg*fOx2JQ G>8ZaYuiG;K