[#1350] node: Add ability to evacuate objects from REP 1
only
Some checks failed
DCO action / DCO (pull_request) Successful in 1m1s
Tests and linters / Run gofumpt (pull_request) Successful in 1m11s
Vulncheck / Vulncheck (pull_request) Successful in 1m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m8s
Build / Build Components (pull_request) Successful in 2m17s
Tests and linters / gopls check (pull_request) Successful in 2m37s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / Lint (pull_request) Successful in 3m26s
Tests and linters / Tests (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Failing after 4m45s
Some checks failed
DCO action / DCO (pull_request) Successful in 1m1s
Tests and linters / Run gofumpt (pull_request) Successful in 1m11s
Vulncheck / Vulncheck (pull_request) Successful in 1m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m8s
Build / Build Components (pull_request) Successful in 2m17s
Tests and linters / gopls check (pull_request) Successful in 2m37s
Tests and linters / Staticcheck (pull_request) Successful in 2m45s
Tests and linters / Lint (pull_request) Successful in 3m26s
Tests and linters / Tests (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Failing after 4m45s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
af6139b981
commit
763a7cef1c
11 changed files with 166 additions and 2 deletions
|
@ -24,6 +24,7 @@ func evacuateShard(cmd *cobra.Command, _ []string) {
|
|||
req := &control.EvacuateShardRequest{Body: new(control.EvacuateShardRequest_Body)}
|
||||
req.Body.Shard_ID = getShardIDList(cmd)
|
||||
req.Body.IgnoreErrors, _ = cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
req.Body.RepOneOnly, _ = cmd.Flags().GetBool(repOneOnlyFlag)
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ const (
|
|||
awaitFlag = "await"
|
||||
noProgressFlag = "no-progress"
|
||||
scopeFlag = "scope"
|
||||
repOneOnlyFlag = "rep-one-only"
|
||||
|
||||
scopeAll = "all"
|
||||
scopeObjects = "objects"
|
||||
|
@ -64,12 +65,14 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
|||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
|
||||
|
||||
req := &control.StartShardEvacuationRequest{
|
||||
Body: &control.StartShardEvacuationRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Scope: getEvacuationScope(cmd),
|
||||
RepOneOnly: repOneOnly,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -371,6 +374,7 @@ func initControlStartEvacuationShardCmd() {
|
|||
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
|
||||
|
||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
||||
|
|
|
@ -20,7 +20,9 @@ Because it is necessary to prevent removing by policer objects with policy `REP
|
|||
|
||||
## Commands
|
||||
|
||||
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
|
||||
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
|
||||
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
|
||||
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
|
||||
|
||||
`frostfs-cli control shards evacuation stop` stops running evacuation process.
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ type EvacuateShardPrm struct {
|
|||
IgnoreErrors bool
|
||||
Async bool
|
||||
Scope EvacuateScope
|
||||
RepOneOnly bool
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
|
@ -268,6 +269,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
|||
attribute.Bool("async", prm.Async),
|
||||
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
|
||||
attribute.Stringer("scope", prm.Scope),
|
||||
attribute.Bool("repOneOnly", prm.RepOneOnly),
|
||||
))
|
||||
|
||||
defer func() {
|
||||
|
@ -380,6 +382,15 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
|
|||
listPrm.ObjectWorkerCount = e.cfg.evacuationObjectWorkerCount
|
||||
listPrm.ContainerWorkerCount = e.cfg.evacuationContainerWorkerCount
|
||||
|
||||
if prm.RepOneOnly {
|
||||
listPrm.ExcludeContainer = func(cid cid.ID) (bool, error) {
|
||||
return e.isNotRepOne(cid)
|
||||
}
|
||||
listPrm.CalcExcluded = func(count uint64) {
|
||||
res.objSkipped.Add(count)
|
||||
}
|
||||
}
|
||||
|
||||
err := sh.ListConcurrently(ctx, listPrm)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||
|
@ -648,7 +659,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
default:
|
||||
}
|
||||
addr := toEvacuate[i].Address
|
||||
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
|
@ -698,6 +708,20 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) {
|
||||
c, err := e.containerSource.Load().cs.Get(cid)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
p := c.Value.PlacementPolicy()
|
||||
for i := range p.NumberOfReplicas() {
|
||||
if p.ReplicaDescriptor(i).NumberOfObjects() == 1 {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
||||
) (bool, error) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
|
@ -19,14 +20,31 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type containerStorage struct {
|
||||
cntmap map[cid.ID]*container.Container
|
||||
}
|
||||
|
||||
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
|
||||
coreCnt := coreContainer.Container{
|
||||
Value: *cs.cntmap[id],
|
||||
}
|
||||
return &coreCnt, nil
|
||||
}
|
||||
|
||||
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||
dir := t.TempDir()
|
||||
|
||||
|
@ -608,3 +626,68 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
|
||||
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
|
||||
}
|
||||
|
||||
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 0)
|
||||
defer func() {
|
||||
require.NoError(t, e.Close(context.Background()))
|
||||
}()
|
||||
|
||||
// Create container with policy REP 2
|
||||
cnr1 := container.Container{}
|
||||
p1 := netmap.PlacementPolicy{}
|
||||
p1.SetContainerBackupFactor(1)
|
||||
x1 := netmap.ReplicaDescriptor{}
|
||||
x1.SetNumberOfObjects(2)
|
||||
p1.AddReplicas(x1)
|
||||
cnr1.SetPlacementPolicy(p1)
|
||||
|
||||
var idCnr1 cid.ID
|
||||
container.CalculateID(&idCnr1, cnr1)
|
||||
|
||||
cnrmap := make(map[cid.ID]*container.Container)
|
||||
var cids []cid.ID
|
||||
cnrmap[idCnr1] = &cnr1
|
||||
cids = append(cids, idCnr1)
|
||||
|
||||
// Create container with policy REP 1
|
||||
cnr2 := container.Container{}
|
||||
p2 := netmap.PlacementPolicy{}
|
||||
p2.SetContainerBackupFactor(1)
|
||||
x2 := netmap.ReplicaDescriptor{}
|
||||
x2.SetNumberOfObjects(1)
|
||||
p2.AddReplicas(x2)
|
||||
cnr2.SetPlacementPolicy(p2)
|
||||
|
||||
var idCnr2 cid.ID
|
||||
container.CalculateID(&idCnr2, cnr2)
|
||||
cnrmap[idCnr2] = &cnr2
|
||||
cids = append(cids, idCnr2)
|
||||
|
||||
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
|
||||
|
||||
for _, sh := range ids {
|
||||
for j := range 2 {
|
||||
for range 4 {
|
||||
obj := testutil.GenerateObjectWithCID(cids[j])
|
||||
var putPrm shard.PutPrm
|
||||
putPrm.SetObject(obj)
|
||||
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids[0:1]
|
||||
prm.Scope = EvacuateScopeObjects
|
||||
prm.RepOneOnly = true
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(4), res.ObjectsEvacuated())
|
||||
require.Equal(t, uint64(4), res.ObjectsSkipped())
|
||||
require.Equal(t, uint64(0), res.ObjectsFailed())
|
||||
}
|
||||
|
|
|
@ -72,6 +72,10 @@ type ListConcurrentlyPrm struct {
|
|||
ContainerWorkerCount uint32
|
||||
// ObjectWorkerCount amount of workers runs Handler concurrently for each container.
|
||||
ObjectWorkerCount uint32
|
||||
// ExcludeContainer function to check should container be excluded or not.
|
||||
ExcludeContainer func(cid.ID) (bool, error)
|
||||
// CalcExcluded count excluded objects
|
||||
CalcExcluded func(uint642 uint64)
|
||||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase starting from
|
||||
|
@ -259,6 +263,24 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
|
|||
return to, offset, cursor, nil
|
||||
}
|
||||
|
||||
// countAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
||||
func countAliveObjectsInBucket(bkt *bbolt.Bucket, // main bucket
|
||||
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||
cidRaw []byte, // container ID prefix, optimization
|
||||
) uint64 {
|
||||
c := bkt.Cursor()
|
||||
k, _ := c.First()
|
||||
var count uint64
|
||||
for ; k != nil; k, _ = c.Next() {
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
|
||||
if len(name) < bucketKeySize {
|
||||
return nil, 0
|
||||
|
@ -343,6 +365,21 @@ func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcur
|
|||
var cnt cid.ID
|
||||
copy(cnt[:], containerID[:])
|
||||
eg.Go(func() error {
|
||||
if prm.ExcludeContainer != nil {
|
||||
exclude, err := prm.ExcludeContainer(cnt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if exclude {
|
||||
if prm.CalcExcluded != nil {
|
||||
buf := make([]byte, cidSize, addressKeySize)
|
||||
copy(buf, rawAddr)
|
||||
prm.CalcExcluded(
|
||||
countAliveObjectsInBucket(bkt, graveyardBkt, garbageBkt, buf))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return selectConcurrentlyFromBucket(egCtx,
|
||||
bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm)
|
||||
})
|
||||
|
|
|
@ -44,6 +44,10 @@ type ListConcurrentlyPrm struct {
|
|||
ContainerWorkerCount uint32
|
||||
// ObjectWorkerCount amount of workers runs Handler concurrently
|
||||
ObjectWorkerCount uint32
|
||||
// ExcludeContainer function to check should container be excluded or not.
|
||||
ExcludeContainer func(cid.ID) (bool, error)
|
||||
// CalcExcluded count excluded objects
|
||||
CalcExcluded func(uint642 uint64)
|
||||
}
|
||||
|
||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||
|
@ -198,6 +202,8 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e
|
|||
metaPrm.Handler = prm.Handler
|
||||
metaPrm.ContainerWorkerCount = prm.ContainerWorkerCount
|
||||
metaPrm.ObjectWorkerCount = prm.ObjectWorkerCount
|
||||
metaPrm.ExcludeContainer = prm.ExcludeContainer
|
||||
metaPrm.CalcExcluded = prm.CalcExcluded
|
||||
err := s.metaBase.ListConcurrently(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not list objects concurrently: %w", err)
|
||||
|
|
|
@ -37,6 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicateObject,
|
||||
Scope: engine.EvacuateScopeObjects,
|
||||
RepOneOnly: req.GetBody().GetRepOneOnly(),
|
||||
}
|
||||
|
||||
res, err := s.s.Evacuate(ctx, prm)
|
||||
|
|
|
@ -29,6 +29,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
|||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
RepOneOnly: req.GetBody().GetRepOneOnly(),
|
||||
}
|
||||
|
||||
_, err = s.s.Evacuate(ctx, prm)
|
||||
|
|
|
@ -316,6 +316,9 @@ message EvacuateShardRequest {
|
|||
|
||||
// Flag indicating whether object read errors should be ignored.
|
||||
bool ignore_errors = 2;
|
||||
|
||||
// Choose for evacuation objects in `REP 1` containers only.
|
||||
bool rep_one_only = 3;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
@ -394,6 +397,8 @@ message StartShardEvacuationRequest {
|
|||
bool ignore_errors = 2;
|
||||
// Evacuation scope.
|
||||
uint32 scope = 3;
|
||||
// Choose for evacuation objects in `REP 1` containers only.
|
||||
bool rep_one_only = 4;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue