node: Add ability to evacuate objects from REP 1
only #1350
9 changed files with 309 additions and 6 deletions
|
@ -20,6 +20,7 @@ const (
|
||||||
awaitFlag = "await"
|
awaitFlag = "await"
|
||||||
noProgressFlag = "no-progress"
|
noProgressFlag = "no-progress"
|
||||||
scopeFlag = "scope"
|
scopeFlag = "scope"
|
||||||
|
repOneOnlyFlag = "rep-one-only"
|
||||||
|
|
||||||
containerWorkerCountFlag = "container-worker-count"
|
containerWorkerCountFlag = "container-worker-count"
|
||||||
objectWorkerCountFlag = "object-worker-count"
|
objectWorkerCountFlag = "object-worker-count"
|
||||||
|
@ -69,6 +70,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
||||||
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
||||||
|
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
|
||||||
|
|
||||||
req := &control.StartShardEvacuationRequest{
|
req := &control.StartShardEvacuationRequest{
|
||||||
Body: &control.StartShardEvacuationRequest_Body{
|
Body: &control.StartShardEvacuationRequest_Body{
|
||||||
|
@ -77,6 +79,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
Scope: getEvacuationScope(cmd),
|
Scope: getEvacuationScope(cmd),
|
||||||
ContainerWorkerCount: containerWorkerCount,
|
ContainerWorkerCount: containerWorkerCount,
|
||||||
ObjectWorkerCount: objectWorkerCount,
|
ObjectWorkerCount: objectWorkerCount,
|
||||||
|
RepOneOnly: repOneOnly,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -380,6 +383,7 @@ func initControlStartEvacuationShardCmd() {
|
||||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||||
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
||||||
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
||||||
|
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
|
||||||
|
|
||||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,12 @@ Because it is necessary to prevent removing by policer objects with policy `REP
|
||||||
|
|
||||||
## Commands
|
## Commands
|
||||||
|
|
||||||
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
|
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
|
||||||
|
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
|
||||||
|
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
|
||||||
|
To adjust resource consumption required for evacuation use options:
|
||||||
|
- `--container-worker-count` count of concurrent container evacuation workers
|
||||||
|
- `--object-worker-count` count of concurrent object evacuation workers
|
||||||
|
|
||||||
`frostfs-cli control shards evacuation stop` stops running evacuation process.
|
`frostfs-cli control shards evacuation stop` stops running evacuation process.
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
@ -16,6 +17,7 @@ import (
|
||||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -88,6 +90,7 @@ type EvacuateShardPrm struct {
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
Async bool
|
Async bool
|
||||||
Scope EvacuateScope
|
Scope EvacuateScope
|
||||||
|
RepOneOnly bool
|
||||||
|
|
||||||
ContainerWorkerCount uint32
|
ContainerWorkerCount uint32
|
||||||
ObjectWorkerCount uint32
|
ObjectWorkerCount uint32
|
||||||
|
@ -288,6 +291,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
attribute.Bool("async", prm.Async),
|
attribute.Bool("async", prm.Async),
|
||||||
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
|
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
|
||||||
attribute.Stringer("scope", prm.Scope),
|
attribute.Stringer("scope", prm.Scope),
|
||||||
|
attribute.Bool("repOneOnly", prm.RepOneOnly),
|
||||||
))
|
))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -430,13 +434,34 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
) error {
|
) error {
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
var cntPrm shard.IterateOverContainersPrm
|
var cntPrm shard.IterateOverContainersPrm
|
||||||
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
|
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return context.Cause(ctx)
|
return context.Cause(ctx)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
egContainer.Go(func() error {
|
egContainer.Go(func() error {
|
||||||
|
var skip bool
|
||||||
|
c, err := e.containerSource.Load().cs.Get(cnt)
|
||||||
|
if err != nil {
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
skip = true
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !skip && prm.RepOneOnly {
|
||||||
|
skip = e.isNotRepOne(c)
|
||||||
|
}
|
||||||
|
if skip {
|
||||||
|
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
|
||||||
|
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.objSkipped.Add(count)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
var objPrm shard.IterateOverObjectsInContainerPrm
|
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||||
objPrm.BucketName = name
|
objPrm.BucketName = name
|
||||||
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||||
|
@ -454,7 +479,7 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
|
err = sh.IterateOverObjectsInContainer(ctx, objPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel(err)
|
cancel(err)
|
||||||
}
|
}
|
||||||
|
@ -781,6 +806,16 @@ func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objI
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
aarifullin marked this conversation as resolved
Outdated
|
|||||||
|
|
||||||
|
func (e *StorageEngine) isNotRepOne(c *container.Container) bool {
|
||||||
|
p := c.Value.PlacementPolicy()
|
||||||
|
for i := range p.NumberOfReplicas() {
|
||||||
|
if p.ReplicaDescriptor(i).NumberOfObjects() > 1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
|
@ -20,14 +21,38 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type containerStorage struct {
|
||||||
|
cntmap map[cid.ID]*container.Container
|
||||||
|
latency time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
|
||||||
|
time.Sleep(cs.latency)
|
||||||
|
v, ok := cs.cntmap[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, new(apistatus.ContainerNotFound)
|
||||||
|
}
|
||||||
|
coreCnt := coreContainer.Container{
|
||||||
|
Value: *v,
|
||||||
|
}
|
||||||
|
return &coreCnt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
|
@ -61,10 +86,15 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
||||||
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
|
{Key: pilorama.AttributeVersion, Value: []byte("XXX")},
|
||||||
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
|
{Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
|
||||||
}
|
}
|
||||||
|
cnrMap := make(map[cid.ID]*container.Container)
|
||||||
for _, sh := range ids {
|
for _, sh := range ids {
|
||||||
for range objPerShard {
|
for i := range objPerShard {
|
||||||
|
// Create dummy container
|
||||||
|
cnr1 := container.Container{}
|
||||||
|
cnr1.SetAttribute("cnr", "cnr"+strconv.Itoa(i))
|
||||||
contID := cidtest.ID()
|
contID := cidtest.ID()
|
||||||
|
cnrMap[contID] = &cnr1
|
||||||
|
|
||||||
obj := testutil.GenerateObjectWithCID(contID)
|
obj := testutil.GenerateObjectWithCID(contID)
|
||||||
objects = append(objects, obj)
|
objects = append(objects, obj)
|
||||||
|
|
||||||
|
@ -78,6 +108,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
e.SetContainerSource(&containerStorage{cntmap: cnrMap})
|
||||||
return e, ids, objects
|
return e, ids, objects
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +208,10 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
|
|
||||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||||
var n atomic.Uint64
|
var n atomic.Uint64
|
||||||
|
var mtx sync.Mutex
|
||||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||||
|
mtx.Lock()
|
||||||
|
defer mtx.Unlock()
|
||||||
if n.Load() == max {
|
if n.Load() == max {
|
||||||
return false, errReplication
|
return false, errReplication
|
||||||
}
|
}
|
||||||
|
@ -640,3 +674,146 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
|
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
|
||||||
|
e, ids, _ := newEngineEvacuate(t, 2, 0)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, e.Close(context.Background()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Create container with policy REP 2
|
||||||
|
cnr1 := container.Container{}
|
||||||
|
p1 := netmap.PlacementPolicy{}
|
||||||
|
p1.SetContainerBackupFactor(1)
|
||||||
|
x1 := netmap.ReplicaDescriptor{}
|
||||||
|
x1.SetNumberOfObjects(2)
|
||||||
|
p1.AddReplicas(x1)
|
||||||
|
x1 = netmap.ReplicaDescriptor{}
|
||||||
|
x1.SetNumberOfObjects(1)
|
||||||
|
p1.AddReplicas(x1)
|
||||||
|
cnr1.SetPlacementPolicy(p1)
|
||||||
|
cnr1.SetAttribute("cnr", "cnr1")
|
||||||
|
|
||||||
|
var idCnr1 cid.ID
|
||||||
|
container.CalculateID(&idCnr1, cnr1)
|
||||||
|
|
||||||
|
cnrmap := make(map[cid.ID]*container.Container)
|
||||||
|
var cids []cid.ID
|
||||||
|
cnrmap[idCnr1] = &cnr1
|
||||||
|
cids = append(cids, idCnr1)
|
||||||
|
|
||||||
|
// Create container with policy REP 1
|
||||||
|
cnr2 := container.Container{}
|
||||||
|
p2 := netmap.PlacementPolicy{}
|
||||||
|
p2.SetContainerBackupFactor(1)
|
||||||
|
x2 := netmap.ReplicaDescriptor{}
|
||||||
|
x2.SetNumberOfObjects(1)
|
||||||
|
p2.AddReplicas(x2)
|
||||||
|
x2 = netmap.ReplicaDescriptor{}
|
||||||
|
x2.SetNumberOfObjects(1)
|
||||||
|
p2.AddReplicas(x2)
|
||||||
|
cnr2.SetPlacementPolicy(p2)
|
||||||
|
cnr2.SetAttribute("cnr", "cnr2")
|
||||||
|
|
||||||
|
var idCnr2 cid.ID
|
||||||
|
container.CalculateID(&idCnr2, cnr2)
|
||||||
|
cnrmap[idCnr2] = &cnr2
|
||||||
|
cids = append(cids, idCnr2)
|
||||||
|
|
||||||
|
// Create container for simulate removing
|
||||||
|
cnr3 := container.Container{}
|
||||||
|
p3 := netmap.PlacementPolicy{}
|
||||||
|
p3.SetContainerBackupFactor(1)
|
||||||
|
x3 := netmap.ReplicaDescriptor{}
|
||||||
|
x3.SetNumberOfObjects(1)
|
||||||
|
p3.AddReplicas(x3)
|
||||||
|
cnr3.SetPlacementPolicy(p3)
|
||||||
|
cnr3.SetAttribute("cnr", "cnr3")
|
||||||
|
|
||||||
|
var idCnr3 cid.ID
|
||||||
|
container.CalculateID(&idCnr3, cnr3)
|
||||||
|
cids = append(cids, idCnr3)
|
||||||
|
|
||||||
|
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
|
||||||
|
|
||||||
|
for _, sh := range ids {
|
||||||
|
for j := range 3 {
|
||||||
|
for range 4 {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cids[j])
|
||||||
|
var putPrm shard.PutPrm
|
||||||
|
putPrm.SetObject(obj)
|
||||||
|
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm EvacuateShardPrm
|
||||||
|
prm.ShardID = ids[0:1]
|
||||||
|
prm.Scope = EvacuateScopeObjects
|
||||||
|
prm.RepOneOnly = true
|
||||||
|
|
||||||
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
res, err := e.Evacuate(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(4), res.ObjectsEvacuated())
|
||||||
|
require.Equal(t, uint64(8), res.ObjectsSkipped())
|
||||||
|
require.Equal(t, uint64(0), res.ObjectsFailed())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
e, ids, _ := newEngineEvacuate(t, 2, 0)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, e.Close(context.Background()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
cnrmap := make(map[cid.ID]*container.Container)
|
||||||
|
var cids []cid.ID
|
||||||
|
// Create containers with policy REP 1
|
||||||
|
for i := range 10_000 {
|
||||||
|
cnr1 := container.Container{}
|
||||||
|
p1 := netmap.PlacementPolicy{}
|
||||||
|
p1.SetContainerBackupFactor(1)
|
||||||
|
x1 := netmap.ReplicaDescriptor{}
|
||||||
|
x1.SetNumberOfObjects(2)
|
||||||
|
p1.AddReplicas(x1)
|
||||||
|
cnr1.SetPlacementPolicy(p1)
|
||||||
|
cnr1.SetAttribute("i", strconv.Itoa(i))
|
||||||
|
|
||||||
|
var idCnr1 cid.ID
|
||||||
|
container.CalculateID(&idCnr1, cnr1)
|
||||||
|
|
||||||
|
cnrmap[idCnr1] = &cnr1
|
||||||
|
cids = append(cids, idCnr1)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.SetContainerSource(&containerStorage{
|
||||||
|
cntmap: cnrmap,
|
||||||
|
latency: time.Millisecond * 100,
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, cnt := range cids {
|
||||||
|
for range 1 {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cnt)
|
||||||
|
var putPrm shard.PutPrm
|
||||||
|
putPrm.SetObject(obj)
|
||||||
|
_, err := e.shards[ids[0].String()].Put(context.Background(), putPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var prm EvacuateShardPrm
|
||||||
|
prm.ShardID = ids[0:1]
|
||||||
|
prm.Scope = EvacuateScopeObjects
|
||||||
|
prm.RepOneOnly = true
|
||||||
|
prm.ContainerWorkerCount = 10
|
||||||
|
|
||||||
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
_, err := e.Evacuate(context.Background(), prm)
|
||||||
|
t.Logf("evacuate took %v\n", time.Since(start))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
|
@ -76,6 +76,12 @@ type IterateOverObjectsInContainerPrm struct {
|
||||||
Handler func(context.Context, *objectcore.Info) error
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
|
type CountAliveObjectsInBucketPrm struct {
|
||||||
|
// BucketName container's bucket name.
|
||||||
|
BucketName []byte
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
|
@ -426,3 +432,48 @@ func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, c
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
||||||
|
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return 0, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
cidRaw := prm.BucketName[1:bucketKeySize]
|
||||||
|
if cidRaw == nil {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
var count uint64
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
bkt := tx.Bucket(prm.BucketName)
|
||||||
|
if bkt == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||||
|
garbageBkt := tx.Bucket(garbageBucketName)
|
||||||
|
c := bkt.Cursor()
|
||||||
|
k, _ := c.First()
|
||||||
|
for ; k != nil; k, _ = c.Next() {
|
||||||
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return count, metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
|
@ -44,10 +44,16 @@ type IterateOverContainersPrm struct {
|
||||||
type IterateOverObjectsInContainerPrm struct {
|
type IterateOverObjectsInContainerPrm struct {
|
||||||
// BucketName container's bucket name.
|
// BucketName container's bucket name.
|
||||||
BucketName []byte
|
BucketName []byte
|
||||||
// Handler function executed upon containers in db.
|
// Handler function executed upon objects in db.
|
||||||
Handler func(context.Context, *objectcore.Info) error
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
|
||||||
|
type CountAliveObjectsInBucketPrm struct {
|
||||||
|
// BucketName container's bucket name.
|
||||||
|
BucketName []byte
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
type ListWithCursorPrm struct {
|
type ListWithCursorPrm struct {
|
||||||
count uint32
|
count uint32
|
||||||
|
@ -229,3 +235,25 @@ func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOv
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
|
||||||
|
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "shard.CountAliveObjectsInBucket")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return 0, ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaPrm meta.CountAliveObjectsInBucketPrm
|
||||||
|
metaPrm.BucketName = prm.BucketName
|
||||||
|
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return count, nil
|
||||||
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
||||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||||
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
|
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
|
||||||
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
|
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
|
||||||
|
RepOneOnly: req.GetBody().GetRepOneOnly(),
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.s.Evacuate(ctx, prm)
|
_, err = s.s.Evacuate(ctx, prm)
|
||||||
|
|
|
@ -398,6 +398,8 @@ message StartShardEvacuationRequest {
|
||||||
uint32 container_worker_count = 4;
|
uint32 container_worker_count = 4;
|
||||||
// Count of concurrent object evacuation workers.
|
// Count of concurrent object evacuation workers.
|
||||||
uint32 object_worker_count = 5;
|
uint32 object_worker_count = 5;
|
||||||
|
// Choose for evacuation objects in `REP 1` containers only.
|
||||||
|
bool rep_one_only = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue
You'd had already the discussion with @dstepanov-yadro.
I believe your PR solves the problem for evacuation of objects that have single instance across all nodes. But if the placement policy has a few vectors (
REP 1 ... REP 1
) it seems like it has replicated copy. Could you explain, then, why do you NOT check ifp.NumberOfReplicas() == 1
?I have only one argument for this - network connectivity between nodes for two
REP 1
may be bad. I see that it is not strong and because you don't see any problems I'll add this check.