[#1350] node: Add ability to evacuate objects from REP 1 only

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-03 15:42:38 +03:00
parent 108e4e07be
commit 7ecfe72fae
8 changed files with 120 additions and 1 deletions

View file

@ -24,6 +24,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) {
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)} req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
req.Body.Shard_ID = getShardIDList(cmd) req.Body.Shard_ID = getShardIDList(cmd)
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag) req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag)
req.Body.RepOneOnly, _ = cmd.Flags().GetBool(repOneOnlyFlag)
signRequest(cmd, pk, req) signRequest(cmd, pk, req)

View file

@ -20,6 +20,7 @@ const (
awaitFlag = "await" awaitFlag = "await"
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope" scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
scopeAll = "all" scopeAll = "all"
scopeObjects = "objects" scopeObjects = "objects"
@ -64,12 +65,14 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
req := &control.StartShardEvacuationRequest{ req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{ Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd), Scope: getEvacuationScope(cmd),
RepOneOnly: repOneOnly,
}, },
} }
@ -371,6 +374,7 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll)) flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy `REP 1 ...`")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
} }

View file

@ -79,6 +79,7 @@ type EvacuateShardPrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool Async bool
Scope EvacuateScope Scope EvacuateScope
RepOneOnly bool
} }
// EvacuateShardRes represents result of the EvacuateShard operation. // EvacuateShardRes represents result of the EvacuateShard operation.
@ -270,6 +271,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.Bool("async", prm.Async), attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope), attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
)) ))
defer func() { defer func() {
@ -653,7 +655,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
default: default:
} }
addr := toEvacuate[i].Address addr := toEvacuate[i].Address
if prm.RepOneOnly {
repOne, err := e.isRepOne(addr.Container())
if err != nil {
return err
}
if !repOne {
res.objSkipped.Add(1)
continue
}
}
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true) getPrm.SkipEvacCheck(true)
@ -703,6 +714,20 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
return nil return nil
} }
func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) {
c, err := e.containerSource.Load().cs.Get(cid)
if err != nil {
return false, err
}
p := c.Value.PlacementPolicy()
for i := 0; i < p.NumberOfReplicas(); i++ {
if p.ReplicaDescriptor(i).NumberOfObjects() == 1 {
return true, nil
}
}
return false, nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
) (bool, error) { ) (bool, error) {

View file

@ -9,6 +9,7 @@ import (
"testing" "testing"
"time" "time"
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -18,14 +19,31 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type containerStorage struct {
cntmap map[cid.ID]*container.Container
}
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
coreCnt := coreContainer.Container{
Value: *cs.cntmap[id],
}
return &coreCnt, nil
}
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
return nil, nil
}
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir() dir := t.TempDir()
@ -605,3 +623,67 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.Equal(t, expectedTreeOps, evacuatedTreeOps) require.Equal(t, expectedTreeOps, evacuatedTreeOps)
} }
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
// Create container with policy REP 2
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
// Create container with policy REP 1
cnr2 := container.Container{}
p2 := netmap.PlacementPolicy{}
p2.SetContainerBackupFactor(1)
x2 := netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
cnr2.SetPlacementPolicy(p2)
var idCnr2 cid.ID
container.CalculateID(&idCnr2, cnr2)
cnrmap[idCnr2] = &cnr2
cids = append(cids, idCnr2)
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids {
for i := 0; i < 4; i++ {
obj := testutil.GenerateObjectWithCID(cids[i%2])
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(2), res.ObjectsEvacuated())
require.Equal(t, uint64(2), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
}

View file

@ -37,6 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
IgnoreErrors: req.GetBody().GetIgnoreErrors(), IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject, ObjectsHandler: s.replicateObject,
Scope: engine.EvacuateScopeObjects, Scope: engine.EvacuateScopeObjects,
RepOneOnly: req.GetBody().GetRepOneOnly(),
} }
res, err := s.s.Evacuate(ctx, prm) res, err := s.s.Evacuate(ctx, prm)

View file

@ -29,6 +29,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
TreeHandler: s.replicateTree, TreeHandler: s.replicateTree,
Async: true, Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()), Scope: engine.EvacuateScope(req.GetBody().GetScope()),
RepOneOnly: req.GetBody().GetRepOneOnly(),
} }
_, err = s.s.Evacuate(ctx, prm) _, err = s.s.Evacuate(ctx, prm)

View file

@ -313,6 +313,9 @@ message EvacuateShardRequest {
// Flag indicating whether object read errors should be ignored. // Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2; bool ignore_errors = 2;
// Choose for evacuation objects in `REP 1` containers only.
bool rep_one_only = 3;
} }
Body body = 1; Body body = 1;
@ -391,6 +394,8 @@ message StartShardEvacuationRequest {
bool ignore_errors = 2; bool ignore_errors = 2;
// Evacuation scope. // Evacuation scope.
uint32 scope = 3; uint32 scope = 3;
// Choose for evacuation objects in `REP 1` containers only.
bool rep_one_only = 4;
} }
Body body = 1; Body body = 1;

Binary file not shown.