WIP: node: Add ability to evacuate objects from REP 1 only #1350

Draft
acid-ant wants to merge 3 commits from acid-ant/frostfs-node:feat/evac-skip-rep-one into master
16 changed files with 715 additions and 121 deletions

View file

@ -20,6 +20,10 @@ const (
awaitFlag = "await"
noProgressFlag = "no-progress"
scopeFlag = "scope"
repOneOnlyFlag = "rep-one-only"
containerWorkerCountFlag = "container-worker-count"
objectWorkerCountFlag = "object-worker-count"
scopeAll = "all"
scopeObjects = "objects"
@ -64,12 +68,18 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
repOneOnly, _ := cmd.Flags().GetBool(repOneOnlyFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
ContainerWorkerCount: containerWorkerCount,
ObjectWorkerCount: objectWorkerCount,
RepOneOnly: repOneOnly,
},
}
@ -371,6 +381,9 @@ func initControlStartEvacuationShardCmd() {
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
flags.Bool(repOneOnlyFlag, false, "Evacuate objects only from containers with policy 'REP 1 ...'")
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -110,6 +110,9 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []shardCfg
lowMem bool
evacuationObjectWorkerCount uint32
evacuationContainerWorkerCount uint32
}
// if need to run node in compatibility with other versions mode
@ -228,6 +231,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c)
a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -829,6 +834,8 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount),
engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount),
)
if c.metricsCollector != nil {

View file

@ -15,6 +15,12 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
// EvacuationContainerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
EvacuationContainerWorkerCountDefault = 10
// EvacuationObjectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
EvacuationObjectWorkerCountDefault = 10
)
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +94,19 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem")
}
// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section.
func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 {
return v
}
return EvacuationObjectWorkerCountDefault
}
// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section.
func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 {
return v
}
return EvacuationContainerWorkerCountDefault
}

View file

@ -91,6 +91,8 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
EVACUATION_OBJECT_WORKER_COUNT=10
EVACUATION_CONTAINER_WORKER_COUNT=10
## 0 shard
### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false

View file

@ -136,6 +136,8 @@
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"evacuation_container_worker_count": 10,
"evacuation_object_worker_count": 10,
"shard": {
"0": {
"mode": "read-only",

View file

@ -118,6 +118,8 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers
evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers
shard:
default: # section with the default shard parameters

View file

@ -20,7 +20,9 @@ Because it is necessary to prevent removing by policer objects with policy `REP
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
To evacuate objects only from containers with policy `REP 1` use option `--rep-one-only`.
`frostfs-cli control shards evacuation stop` stops running evacuation process.

View file

@ -167,12 +167,14 @@ morph:
Local storage engine configuration.
| Parameter | Type | Default value | Description |
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
| Parameter | Type | Default value | Description |
|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. |
| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers. |
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
## `shard` subsection

View file

@ -213,12 +213,18 @@ type cfg struct {
lowMem bool
containerSource atomic.Pointer[containerSource]
evacuationObjectWorkerCount uint32
evacuationContainerWorkerCount uint32
}
func defaultCfg() *cfg {
res := &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
evacuationObjectWorkerCount: 10,
evacuationContainerWorkerCount: 10,
}
res.containerSource.Store(&containerSource{})
return res
@ -277,6 +283,20 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
}
}
// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container.
func WithEvacuationObjectWorkerCount(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectWorkerCount = count
}
}
// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard.
func WithEvacuationContainerWorkerCount(count uint32) Option {
return func(c *cfg) {
c.evacuationContainerWorkerCount = count
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})

View file

@ -10,7 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -24,6 +23,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
@ -79,6 +79,10 @@ type EvacuateShardPrm struct {
IgnoreErrors bool
Async bool
Scope EvacuateScope
RepOneOnly bool
ContainerWorkerCount uint32
ObjectWorkerCount uint32
}
// EvacuateShardRes represents result of the EvacuateShard operation.
@ -189,8 +193,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
@ -242,8 +244,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return nil, err
}
var mtx sync.RWMutex
copyShards := func() []pooledShard {
mtx.RLock()
defer mtx.RUnlock()
t := make([]pooledShard, len(shards))
copy(t, shards)
return t
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
})
if prm.Async {
@ -261,7 +271,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
@ -270,6 +280,7 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
attribute.Bool("repOneOnly", prm.RepOneOnly),
))
defer func() {
@ -287,12 +298,44 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
egShard, _ := errgroup.WithContext(ctx)
egContainer, _ := errgroup.WithContext(ctx)
containerWorkerCount := prm.ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = e.cfg.evacuationContainerWorkerCount
}
egContainer.SetLimit(int(containerWorkerCount))
egObject, _ := errgroup.WithContext(ctx)
objectWorkerCount := prm.ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = e.cfg.evacuationObjectWorkerCount
}
egObject.SetLimit(int(objectWorkerCount))
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
egShard.Go(func() error {
if err := e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
return nil
})
}
errShard := egShard.Wait()
errContainer := egContainer.Wait()
errObject := egObject.Wait()
if errShard != nil {
err = errShard
return errShard
}
if errContainer != nil {
err = errContainer
return errContainer
}
if errObject != nil {
err = errObject
return errObject
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
@ -336,7 +379,8 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
@ -344,59 +388,71 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
))
defer span.End()
eg, egCtx := errgroup.WithContext(ctx)
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject)
})
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
}
return nil
return eg.Wait()
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
egContainer *errgroup.Group, egObject *errgroup.Group,
) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
var cntPrm shard.IterateOverContainersPrm
cntPrm.Handler = func(ctx context.Context, name []byte, cnt cid.ID) error {
egContainer.Go(func() error {
if prm.RepOneOnly {
notRepOne, err := e.isNotRepOne(cnt)
if err != nil {
return err
}
if notRepOne {
countPrm := shard.CountAliveObjectsInBucketPrm{BucketName: name}
count, err := sh.CountAliveObjectsInBucket(ctx, countPrm)
if err != nil {
return err
}
res.objSkipped.Add(count)
return nil
}
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
var objPrm shard.IterateOverObjectsInContainerPrm
objPrm.BucketName = name
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
egObject.Go(func() error {
return e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
})
return nil
}
return sh.IterateOverObjectsInContainer(ctx, objPrm)
})
return nil
}
return nil
sh.SetEvacuationInProgress(true)
err := sh.IterateOverContainers(ctx, cntPrm)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
shards := getShards()
var listPrm pilorama.TreeListTreesPrm
first := true
@ -637,72 +693,83 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
return shards, nil
}
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
defer span.End()
for i := range toEvacuate {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
addr := toEvacuate[i].Address
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
shards := getShards()
addr := objInfo.Address
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true)
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
if err != nil {
return err
}
if evacuatedLocal {
continue
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
if err != nil {
if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
return nil
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
if err != nil {
return err
}
if evacuatedLocal {
return nil
}
if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, objInfo)
}
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if moved {
res.objEvacuated.Add(1)
} else if prm.IgnoreErrors {
res.objFailed.Add(1)
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
return fmt.Errorf("object %s was not replicated", addr)
}
return nil
}
func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) {
c, err := e.containerSource.Load().cs.Get(cid)
if err != nil {
return false, err
}
p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() == 1 {
return false, nil
}
}
return true, nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,
) (bool, error) {

View file

@ -6,9 +6,12 @@ import (
"fmt"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
coreContainer "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
@ -18,18 +21,38 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
type containerStorage struct {
cntmap map[cid.ID]*container.Container
latency time.Duration
}
func (cs *containerStorage) Get(id cid.ID) (*coreContainer.Container, error) {
time.Sleep(cs.latency)
coreCnt := coreContainer.Container{
Value: *cs.cntmap[id],
}
return &coreCnt, nil
}
func (cs *containerStorage) DeletionInfo(cid.ID) (*coreContainer.DelInfo, error) {
return nil, nil
}
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir()
te := testNewEngine(t).
te := testNewEngine(t,
WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)).
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
return []shard.Option{
shard.WithLogger(test.NewLogger(t)),
@ -174,13 +197,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
errReplication := errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
var n uint64
var n atomic.Uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
if n == max {
if n.Load() == max {
return false, errReplication
}
n++
n.Add(1)
for i := range objects {
if addr == objectCore.AddressOf(objects[i]) {
require.Equal(t, objects[i], obj)
@ -531,6 +554,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
mutex := sync.Mutex{}
evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm
prm.ShardID = ids
@ -545,7 +569,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
if op.Time == 0 {
return true, "", nil
}
mutex.Lock()
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
mutex.Unlock()
height = op.Time + 1
}
}
@ -605,3 +631,124 @@ func TestEvacuateTreesRemote(t *testing.T) {
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
}
func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
// Create container with policy REP 2
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
// Create container with policy REP 1
cnr2 := container.Container{}
p2 := netmap.PlacementPolicy{}
p2.SetContainerBackupFactor(1)
x2 := netmap.ReplicaDescriptor{}
x2.SetNumberOfObjects(1)
p2.AddReplicas(x2)
cnr2.SetPlacementPolicy(p2)
var idCnr2 cid.ID
container.CalculateID(&idCnr2, cnr2)
cnrmap[idCnr2] = &cnr2
cids = append(cids, idCnr2)
e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids {
for j := range 2 {
for range 4 {
obj := testutil.GenerateObjectWithCID(cids[j])
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(4), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed())
}
func TestEvacuateShardObjectsRepOneOnlyBench(t *testing.T) {
t.Skip()
e, ids, _ := newEngineEvacuate(t, 2, 0)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
cnrmap := make(map[cid.ID]*container.Container)
var cids []cid.ID
// Create containers with policy REP 1
for i := range 10_000 {
cnr1 := container.Container{}
p1 := netmap.PlacementPolicy{}
p1.SetContainerBackupFactor(1)
x1 := netmap.ReplicaDescriptor{}
x1.SetNumberOfObjects(2)
p1.AddReplicas(x1)
cnr1.SetPlacementPolicy(p1)
cnr1.SetAttribute("i", strconv.Itoa(i))
var idCnr1 cid.ID
container.CalculateID(&idCnr1, cnr1)
cnrmap[idCnr1] = &cnr1
cids = append(cids, idCnr1)
}
e.SetContainerSource(&containerStorage{
cntmap: cnrmap,
latency: time.Millisecond * 100,
})
for _, cnt := range cids {
for range 1 {
obj := testutil.GenerateObjectWithCID(cnt)
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[ids[0].String()].Put(context.Background(), putPrm)
require.NoError(t, err)
}
}
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
prm.RepOneOnly = true
prm.ContainerWorkerCount = 10
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
start := time.Now()
_, err := e.Evacuate(context.Background(), prm)
t.Logf("evacuate took %v\n", time.Since(start))
require.NoError(t, err)
}

View file

@ -61,6 +61,26 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name
BucketName []byte
// Handler function executed upon objects in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for IterateOverObjectsInContainer operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name
BucketName []byte
}
// ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests.
@ -259,3 +279,198 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
return rawID, name[0]
}
// IterateOverContainers lists physical containers available in metabase starting from first.
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverContainers(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
c := tx.Cursor()
name, _ := c.First()
var containerID cid.ID
for ; name != nil; name, _ = c.Next() {
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix {
continue
}
bkt := tx.Bucket(name)
if bkt != nil {
bktName := make([]byte, len(name))
copy(bktName, name)
var cnt cid.ID
copy(cnt[:], containerID[:])
err := prm.Handler(ctx, bktName, cnt)
if err != nil {
return err
}
}
}
return nil
}
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateOverObjectsInContainer(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
bkt := tx.Bucket(prm.BucketName)
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, v := c.First()
var containerID cid.ID
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
if cidRaw == nil {
return nil
}
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
return nil
}
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(containerID)
a.SetObject(obj)
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
err := prm.Handler(ctx, &objInfo)
if err != nil {
return err
}
}
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (db *DB) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("CountAliveObjectsInBucket", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.CountAliveObjectsInBucket")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return 0, ErrDegradedMode
}
var count uint64
err := db.boltDB.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(prm.BucketName)
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
c := bkt.Cursor()
k, _ := c.First()
cidRaw := prm.BucketName[1:bucketKeySize]
if cidRaw == nil {
return nil
}
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++
}
return nil
})
success = err == nil
return count, metaerr.Wrap(err)
}

View file

@ -34,6 +34,26 @@ func (r ListContainersRes) Containers() []cid.ID {
return r.containers
}
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
type IterateOverContainersPrm struct {
// Handler function executed upon containers in db.
Handler func(context.Context, []byte, cid.ID) error
}
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
type IterateOverObjectsInContainerPrm struct {
// BucketName container's bucket name
BucketName []byte
// Handler function executed upon containers in db.
Handler func(context.Context, *objectcore.Info) error
}
// CountAliveObjectsInBucketPrm contains parameters for CountAliveObjectsInBucket operation.
type CountAliveObjectsInBucketPrm struct {
// BucketName container's bucket name
BucketName []byte
}
// ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct {
count uint32
@ -164,3 +184,67 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
cursor: res.Cursor(),
}, nil
}
// IterateOverContainers lists physical containers presented in shard.
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
if s.GetMode().NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverContainersPrm
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over containers: %w", err)
}
return nil
}
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
trace.WithAttributes(
attribute.Bool("has_handler", prm.Handler != nil),
))
defer span.End()
if s.GetMode().NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.IterateOverObjectsInContainerPrm
metaPrm.BucketName = prm.BucketName
metaPrm.Handler = prm.Handler
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not iterate over objects: %w", err)
}
return nil
}
// CountAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func (s *Shard) CountAliveObjectsInBucket(ctx context.Context, prm CountAliveObjectsInBucketPrm) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer")
defer span.End()
if s.GetMode().NoMetabase() {
return 0, ErrDegradedMode
}
var metaPrm meta.CountAliveObjectsInBucketPrm
metaPrm.BucketName = prm.BucketName
count, err := s.metaBase.CountAliveObjectsInBucket(ctx, metaPrm)
if err != nil {
return 0, fmt.Errorf("could not count alive objects in bucket: %w", err)
}
return count, nil
}

View file

@ -23,12 +23,15 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
}
prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
RepOneOnly: req.GetBody().GetRepOneOnly(),
}
_, err = s.s.Evacuate(ctx, prm)

View file

@ -394,6 +394,12 @@ message StartShardEvacuationRequest {
bool ignore_errors = 2;
// Evacuation scope.
uint32 scope = 3;
// Count of concurrent container evacuation workers.
uint32 container_worker_count = 4;
// Count of concurrent object evacuation workers.
uint32 object_worker_count = 5;
// Choose for evacuation objects in `REP 1` containers only.
bool rep_one_only = 6;
}
Body body = 1;

Binary file not shown.